From 9943d3c90f085a436a87638abdb37a6bf4b37f1a Mon Sep 17 00:00:00 2001 From: Dustin Spicuzza Date: Sat, 27 Dec 2025 15:32:37 -0500 Subject: [PATCH] Isolated mode rewrite - Run isolated tests in parallel - Add tests for pytest plugins --- pyfrc/mains/cli_test.py | 19 +- pyfrc/test_support/pytest_dist_plugin.py | 192 -------- .../pytest_isolated_tests_plugin.py | 462 ++++++++++++++++++ tests/conftest.py | 1 + tests/old_test_tests.py | 162 ------ tests/test_pytest_plugins.py | 421 ++++++++++++++++ 6 files changed, 900 insertions(+), 357 deletions(-) delete mode 100644 pyfrc/test_support/pytest_dist_plugin.py create mode 100644 pyfrc/test_support/pytest_isolated_tests_plugin.py delete mode 100644 tests/old_test_tests.py create mode 100644 tests/test_pytest_plugins.py diff --git a/pyfrc/mains/cli_test.py b/pyfrc/mains/cli_test.py index 42f17fed..daf20eac 100644 --- a/pyfrc/mains/cli_test.py +++ b/pyfrc/mains/cli_test.py @@ -28,6 +28,8 @@ class _TryAgain(Exception): # # main test class # + + class PyFrcTest: """ Executes unit tests on the robot code using a special pytest plugin @@ -58,6 +60,13 @@ def __init__(self, parser=None): nargs="*", help="To pass args to pytest, specify --, then the args", ) + parser.add_argument( + "-j", + "--jobs", + type=int, + default=-1, + help="Maximum isolated robot processes (default: max CPUs - 1)", + ) def run( self, @@ -69,6 +78,7 @@ def run( coverage_mode: bool, verbose: bool, pytest_args: typing.List[str], + jobs: int, ): if isolated is None: pyproject_path = project_path / "pyproject.toml" @@ -107,6 +117,7 @@ def run( coverage_mode, verbose, pytest_args, + jobs, ) except _TryAgain: return self._run_test( @@ -118,6 +129,7 @@ def run( coverage_mode, verbose, pytest_args, + jobs, ) def _run_test( @@ -130,6 +142,7 @@ def _run_test( coverage_mode: bool, verbose: bool, pytest_args: typing.List[str], + jobs: int, ): # find test directory, change current directory so pytest can find the tests # -> assume that tests reside in tests or ../tests @@ -159,13 +172,13 @@ def _run_test( try: if isolated: - from ..test_support import pytest_dist_plugin + from ..test_support import pytest_isolated_tests_plugin retv = pytest.main( pytest_args, plugins=[ - pytest_dist_plugin.DistPlugin( - robot_class, main_file, builtin, verbose + pytest_isolated_tests_plugin.IsolatedTestsPlugin( + robot_class, main_file, builtin, verbose, jobs ) ], ) diff --git a/pyfrc/test_support/pytest_dist_plugin.py b/pyfrc/test_support/pytest_dist_plugin.py deleted file mode 100644 index 329dda49..00000000 --- a/pyfrc/test_support/pytest_dist_plugin.py +++ /dev/null @@ -1,192 +0,0 @@ -import multiprocessing -import os -import pathlib -import sys -import time - -from typing import Type - -import pytest - -import robotpy.main -import robotpy.logconfig -import wpilib - - -from .pytest_plugin import PyFrcPlugin - - -def _enable_faulthandler(): - # - # In the event of a segfault, faulthandler will dump the currently - # active stack so you can figure out what went wrong. - # - # Additionally, on non-Windows platforms we register a SIGUSR2 - # handler -- if you send the robot process a SIGUSR2, then - # faulthandler will dump all of your current stacks. This can - # be really useful for figuring out things like deadlocks. - # - - import logging - - logger = logging.getLogger("faulthandler") - - try: - # These should work on all platforms - import faulthandler - - faulthandler.enable() - except Exception as e: - logger.warning("Could not enable faulthandler: %s", e) - return - - try: - import signal - - faulthandler.register(signal.SIGUSR2) - logger.info("registered SIGUSR2 for PID %s", os.getpid()) - except Exception: - return - - -def _run_test( - item_nodeid, config_args, robot_class, robot_file, verbose, pipe, root_path -): - """This function runs in a subprocess""" - robotpy.logconfig.configure_logging(verbose) - _enable_faulthandler() - - # This is used by getDeployDirectory, so make sure it gets fixed - robotpy.main.robot_py_path = robot_file - - os.chdir(root_path) - - # keep the plugin around because it has a reference to the robot - # and we don't want it to die and deadlock - plugin = PyFrcPlugin(robot_class, robot_file, True) - - ec = pytest.main( - [item_nodeid, "--no-header", *config_args], - plugins=[plugin], - ) - - # ensure output is printed out - # .. TODO could implement pytest_runtestloop and send the - # test result back to the parent and print it there? - sys.stdout.flush() - - # Don't let the process die, let the parent kill us to avoid - # python interpreter badness - pipe.send(ec) - - # ensure that the gc doesn't collect the plugin.. - while plugin: - time.sleep(100) - - -def _run_test_in_new_process( - test_function, config, robot_class, robot_file, builtin_tests, verbose -): - """Run a test function in a new process.""" - - config_args = config.invocation_params.args - if builtin_tests: - item_nodeid = f"{config_args[0]}::{test_function.name}" - config_args = config_args[1:] - else: - item_nodeid = test_function.nodeid - - parent, child = multiprocessing.Pipe() - - process = multiprocessing.Process( - target=_run_test, - args=( - item_nodeid, - config_args, - robot_class, - robot_file, - verbose, - child, - config.rootpath, - ), - ) - process.start() - try: - ec = parent.recv() - finally: - process.kill() - - if ec != 0: - pytest.fail( - f"Test failed in subprocess: {item_nodeid} (exit code {ec})", - pytrace=False, - ) - - -def _make_runtest(item, config, robot_class, robot_file, builtin_tests, verbose): - def isolated_runtest(): - _run_test_in_new_process( - item, config, robot_class, robot_file, builtin_tests, verbose - ) - - return isolated_runtest - - -class DistPlugin: - - def __init__( - self, - robot_class: Type[wpilib.RobotBase], - robot_file: pathlib.Path, - builtin_tests: bool, - verbose: bool, - ) -> None: - self._robot_class = robot_class - self._robot_file = robot_file - self._builtin_tests = builtin_tests - self._verbose = verbose - - @pytest.hookimpl(tryfirst=True) - def pytest_collection_modifyitems( - self, - session: pytest.Session, - config: pytest.Config, - items: list[pytest.Function], - ): - """Modify collected test items to run each in a new process.""" - - multiprocessing.set_start_method("spawn") - - for item in items: - if "robot" not in item.fixturenames: - continue - - # Overwrite the runtest protocol for each item that needs the robot - item.runtest = _make_runtest( - item, - config, - self._robot_class, - self._robot_file, - self._builtin_tests, - self._verbose, - ) - - # - # These fixtures match the ones in PyFrcPlugin but these have no effect - # - - @pytest.fixture(scope="function") - def robot(self): - pass - - @pytest.fixture(scope="function") - def control(self, reraise, robot): - pass - - @pytest.fixture() - def robot_file(self): - pass - - @pytest.fixture() - def robot_path(self): - pass diff --git a/pyfrc/test_support/pytest_isolated_tests_plugin.py b/pyfrc/test_support/pytest_isolated_tests_plugin.py new file mode 100644 index 00000000..57852b3a --- /dev/null +++ b/pyfrc/test_support/pytest_isolated_tests_plugin.py @@ -0,0 +1,462 @@ +import dataclasses +import logging +import multiprocessing +import multiprocessing.connection +import os +import pathlib +import signal +import sys +import time + +from typing import Type + +import pytest + +import robotpy.main +import wpilib + + +from .pytest_plugin import PyFrcPlugin + + +def _enable_faulthandler(): + # + # In the event of a segfault, faulthandler will dump the currently + # active stack so you can figure out what went wrong. + # + # Additionally, on non-Windows platforms we register a SIGUSR2 + # handler -- if you send the robot process a SIGUSR2, then + # faulthandler will dump all of your current stacks. This can + # be really useful for figuring out things like deadlocks. + # + + import logging + + logger = logging.getLogger("faulthandler") + + try: + # These should work on all platforms + import faulthandler + + faulthandler.enable() + except Exception as e: + logger.warning("Could not enable faulthandler: %s", e) + return + + try: + faulthandler.register(signal.SIGUSR2) + logger.info("registered SIGUSR2 for PID %s", os.getpid()) + except Exception: + return + + +class WorkerPlugin: + """ + This pytest plugin runs in the isolated process that runs a test that uses the + robot fixture. + + Heavily borrowed from pytest-xdist WorkerInteractor + """ + + def __init__(self, channel: multiprocessing.connection.Connection): + self.channel = channel + + def sendevent(self, name: str, **kwargs: object): + self.channel.send((name, kwargs)) + + @pytest.hookimpl(wrapper=True) + def pytest_sessionstart(self, session: pytest.Session): + self.config = session.config + return (yield) + + @pytest.hookimpl + def pytest_internalerror(self, excrepr: object): + formatted_error = str(excrepr) + for line in formatted_error.split("\n"): + print("IERROR>", line, file=sys.stderr) + self.sendevent("internal_error", formatted_error=formatted_error) + + @pytest.hookimpl + def pytest_runtest_logstart( + self, + nodeid: str, + location: tuple[str, int | None, str], + ): + self.sendevent("logstart", nodeid=nodeid, location=location) + + @pytest.hookimpl + def pytest_runtest_logfinish( + self, + nodeid: str, + location: tuple[str, int | None, str], + ): + self.sendevent("logfinish", nodeid=nodeid, location=location) + + @pytest.hookimpl + def pytest_runtest_logreport(self, report: pytest.TestReport): + data = self.config.hook.pytest_report_to_serializable( + config=self.config, report=report + ) + self.sendevent("testreport", data=data) + + +def _run_test( + item_nodeid, config_args, robot_class, robot_file, verbose, pipe, root_path +): + """This function runs in a subprocess""" + logging.root.addHandler(logging.NullHandler()) + logging.root.setLevel(logging.DEBUG if verbose else logging.INFO) + + _enable_faulthandler() + + # This is used by getDeployDirectory, so make sure it gets fixed + robotpy.main.robot_py_path = robot_file + + os.chdir(root_path) + + # keep the plugins around because it has a reference to the robot + # and we don't want it to die and deadlock + plugin = PyFrcPlugin(robot_class, robot_file, True) + worker_plugin = WorkerPlugin(pipe) + + ec = pytest.main( + [item_nodeid, "--no-header", "-p", "no:terminalreporter", *config_args], + plugins=[plugin, worker_plugin], + ) + + # ensure output is printed out + sys.stdout.flush() + + # Don't let the process die, let the parent kill us to avoid + # python interpreter badness + worker_plugin.sendevent("finished", exit_code=ec) + pipe.close() + + # ensure that the gc doesn't collect the plugin.. + while plugin: + time.sleep(100) + + +@dataclasses.dataclass +class IsolatedTestJob: + item: pytest.Function + conn: multiprocessing.connection.Connection + process: multiprocessing.Process + start_time: float + exit_code: int | None = None + + finished: bool = False + + # set when the worker indicates it has finished + worker_completed: bool = False + + def set_exit_code(self, ec: int): + if self.exit_code is None: + self.exit_code = ec + + +class IsolatedTestsPlugin: + """ + This pytest plugin runs any test that uses the 'robot' fixture in an + isolated subprocess + """ + + def __init__( + self, + robot_class: Type[wpilib.RobotBase], + robot_file: pathlib.Path, + builtin_tests: bool, + verbose: bool, + parallelism: int, + ): + self._robot_class = robot_class + self._robot_file = robot_file + self._builtin_tests = builtin_tests + self._verbose = verbose + + if parallelism < 1: + try: + parallelism = multiprocessing.cpu_count() - 1 + except NotImplementedError: + parallelism = 1 + + self._parallelism = max(1, parallelism) + self._shouldstop = False + + @pytest.hookimpl(wrapper=True) + def pytest_sessionstart(self, session: pytest.Session): + self._config = session.config + self._maxfail: int = self._config.getvalue("maxfail") + self._countfailures = 0 + self._shouldstop = False + + multiprocessing.set_start_method("spawn") + + return (yield) + + @pytest.hookimpl + def pytest_runtestloop(self, session: pytest.Session) -> bool: + if ( + session.testsfailed + and not session.config.option.continue_on_collection_errors + ): + raise session.Interrupted( + f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection" + ) + + if session.config.option.collectonly: + return True + + running: list[IsolatedTestJob] = [] + deferred: list[pytest.Function] = [] + try: + # Start any tests that use the robot fixture first. Tests that don't + # use the robot fixture will be ran later + for item in session.items: + assert isinstance(item, pytest.Function) + if "robot" not in item.fixturenames: + deferred.append(item) + continue + + while len(running) >= self._parallelism: + self._wait_for_jobs(running, session) + + running.append(self._start_isolated_test(item)) + self._maybe_raise(session) + + # Run the in-process tests now while the robot tests are finishing + for idx, item in enumerate(deferred): + nextitem = deferred[idx + 1] if idx + 1 < len(deferred) else None + session.config.hook.pytest_runtest_protocol( + item=item, nextitem=nextitem + ) + self._maybe_raise(session) + + while running: + self._wait_for_jobs(running, session) + finally: + for job in running: + self._cleanup_job(job) + + return True + + def _start_isolated_test(self, item: pytest.Function) -> IsolatedTestJob: + + config_args = self._config.invocation_params.args + if self._builtin_tests: + nodeid = f"{config_args[0]}::{item.name}" + config_args = config_args[1:] + else: + nodeid = item.nodeid + + pconn, cconn = multiprocessing.Pipe() + process = multiprocessing.Process( + target=_run_test, + args=( + nodeid, + config_args, + self._robot_class, + self._robot_file, + self._verbose, + cconn, + self._config.rootpath, + ), + ) + process.start() + cconn.close() + + return IsolatedTestJob( + item=item, + conn=pconn, + process=process, + start_time=time.time(), + ) + + def _wait_for_jobs(self, running: list[IsolatedTestJob], session: pytest.Session): + if not running: + return + + ready = multiprocessing.connection.wait([job.conn for job in running]) + + for conn in ready: + job = next(job for job in running if job.conn == conn) + self._process_job_messages(job, session) + if job.finished: + running.remove(job) + self._finalize_job(job, session) + + def _process_job_messages(self, job: IsolatedTestJob, session: pytest.Session): + while not job.finished: + try: + if not job.conn.poll(): + break + callname, kwargs = job.conn.recv() + except (IOError, EOFError) as e: + job.finished = True + break + + method = "worker_" + callname + call = getattr(self, method) + call(job, **kwargs) + self._maybe_raise(session) + + if not job.process.is_alive(): + job.finished = True + + def _finalize_job(self, job: IsolatedTestJob, session: pytest.Session): + self._cleanup_job(job) + + if job.worker_completed: + return + + stop = time.time() + duration = stop - job.start_time + + ec = job.exit_code + longrepr = None + if ec is None: + longrepr = "subprocess failed for unknown reason" + else: + if ec < 0: + try: + signal_name = signal.strsignal(-ec) + longrepr = f"subprocess exited due to signal {-ec}: {signal_name}" + except ValueError: + pass + + if longrepr is None: + longrepr = f"subprocess exited with exit code {ec}" + + report = pytest.TestReport( + nodeid=job.item.nodeid, + location=job.item.location, + keywords=job.item.keywords, + outcome="failed", + longrepr=longrepr, + when="call", + duration=duration, + start=job.start_time, + stop=stop, + ) + + self._config.hook.pytest_runtest_logstart( + nodeid=job.item.nodeid, location=job.item.location + ) + self._config.hook.pytest_runtest_logreport(report=report) + self._config.hook.pytest_runtest_logfinish( + nodeid=job.item.nodeid, location=job.item.location + ) + + self._maybe_raise(session) + + def _cleanup_job(self, job: IsolatedTestJob): + try: + job.conn.close() + except Exception: + pass + + if job.process.is_alive(): + job.process.kill() + + try: + job.process.join(timeout=1) + except TimeoutError: + pass + + ec = job.process.exitcode + if ec is not None: + job.set_exit_code(ec) + + job.process.close() + + def _maybe_raise(self, session: pytest.Session): + if self._shouldstop: + raise session.Interrupted(self._shouldstop) + if session.shouldfail: + raise session.Failed(session.shouldfail) + if session.shouldstop: + raise session.Interrupted(session.shouldstop) + + # + # Worker dispatch functions (copied from pytest-xdist) + # + + def worker_logstart( + self, + job: IsolatedTestJob, + nodeid: str, + location: tuple[str, int | None, str], + ): + """Emitted when a node calls the pytest_runtest_logstart hook.""" + if self._config.option.verbose > 0: + return + self._config.hook.pytest_runtest_logstart(nodeid=nodeid, location=location) + + def worker_logfinish( + self, + job: IsolatedTestJob, + nodeid: str, + location: tuple[str, int | None, str], + ): + """Emitted when a node calls the pytest_runtest_logfinish hook.""" + if self._config.option.verbose > 0: + return + self._config.hook.pytest_runtest_logfinish(nodeid=nodeid, location=location) + + def worker_testreport(self, job: IsolatedTestJob, data: object): + """Emitted when a node calls the pytest_runtest_logreport hook.""" + + report = self._config.hook.pytest_report_from_serializable( + config=self._config, data=data + ) + self._config.hook.pytest_runtest_logreport(report=report) + self._handlefailures(report) + + def worker_internal_error(self, job: IsolatedTestJob, formatted_error: str): + """Emitted when a node calls the pytest_internalerror hook.""" + for line in formatted_error.split("\n"): + print("IERROR>", line, file=sys.stderr) + + job.finished = True + if not self._shouldstop: + self._shouldstop = "internal error in worker" + + def worker_finished(self, job: IsolatedTestJob, exit_code: object | None = None): + """Emitted when a node finishes running.""" + if exit_code is not None: + job.exit_code = int(exit_code) + + job.worker_completed = True + job.finished = True + + def _handlefailures(self, rep: pytest.TestReport): + if rep.failed: + self._countfailures += 1 + if ( + self._maxfail + and self._countfailures >= self._maxfail + and not self._shouldstop + ): + self._shouldstop = f"stopping after {self._countfailures} failures" + + # + # These fixtures match the ones in PyFrcPlugin but these have no effect + # + + @pytest.fixture(scope="function") + def robot(self): + pass + + @pytest.fixture(scope="function") + def control(self, reraise, robot): + pass + + @pytest.fixture() + def robot_file(self) -> pathlib.Path: + """The absolute filename your robot code is started from""" + return self._robot_file + + @pytest.fixture() + def robot_path(self) -> pathlib.Path: + """The absolute directory that your robot code is located at""" + return self._robot_file.parent diff --git a/tests/conftest.py b/tests/conftest.py index 5c9012b9..51745220 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,3 +8,4 @@ # networktables.NetworkTables.startTestMode() # wpilib.DriverStation._reset() +pytest_plugins = "pytester" diff --git a/tests/old_test_tests.py b/tests/old_test_tests.py deleted file mode 100644 index cf4569ca..00000000 --- a/tests/old_test_tests.py +++ /dev/null @@ -1,162 +0,0 @@ -import contextlib - -import hal -import wpilib - -import pytest - -from pyfrc.test_support.pytest_plugin import PyFrcPlugin -from pyfrc.tests.basic import test_practice as _test_practice - - -@contextlib.contextmanager -def get_plugin(cls): - wpilib.DriverStation._reset() - - plugin = PyFrcPlugin(cls, None, None) - plugin.pytest_runtest_setup() - - try: - yield plugin, plugin.get_control() - finally: - plugin.pytest_runtest_teardown(None) - - -class SampleRobotInit(wpilib.SampleRobot): - def robotInit(self): - assert False - - -class SampleAutonomous(wpilib.SampleRobot): - def autonomous(self): - assert False - - -class SampleTeleop(wpilib.SampleRobot): - def operatorControl(self): - assert False - - -class IterativeRobotInit(wpilib.IterativeRobot): - def robotInit(self): - assert False - - -class IterativeAutoInit(wpilib.IterativeRobot): - def autonomousInit(self): - assert False - - -class IterativeAutoPeriodic(wpilib.IterativeRobot): - def autonomousPeriodic(self): - assert False - - -class IterativeTeleopInit(wpilib.IterativeRobot): - def teleopInit(self): - assert False - - -class IterativeTeleopPeriodic(wpilib.IterativeRobot): - def teleopPeriodic(self): - assert False - - -@pytest.mark.parametrize( - "cls", - [ - SampleRobotInit, - SampleAutonomous, - IterativeRobotInit, - IterativeAutoInit, - IterativeAutoPeriodic, - ], -) -def test_auto_failure(cls): - """Ensure that failures can be detected in autonomous mode""" - with get_plugin(cls) as (plugin, control): - control.set_autonomous(enabled=True) - with pytest.raises(AssertionError): - control.run_test(lambda tm: tm < 15) - - -@pytest.mark.parametrize( - "cls", - [ - SampleRobotInit, - SampleTeleop, - IterativeRobotInit, - IterativeTeleopInit, - IterativeTeleopPeriodic, - ], -) -def test_teleop_failure(cls): - """Ensure that failures can be detected in teleop mode""" - with get_plugin(cls) as (plugin, control): - control.set_operator_control(enabled=True) - with pytest.raises(AssertionError): - control.run_test(lambda tm: tm < 15) - - -class Iterative(wpilib.IterativeRobot): - def robotInit(self): - self.did_robot_init = True - - def disabledInit(self): - self.did_disabled_init = True - - def disabledPeriodic(self): - self.did_disabled_periodic = True - - def autonomousInit(self): - self.did_auto_init = True - - def autonomousPeriodic(self): - self.did_auto_periodic = True - - def teleopInit(self): - self.did_teleop_init = True - - def teleopPeriodic(self): - self.did_teleop_periodic = True - - -def test_iterative(): - """Ensure that all states of the iterative robot run""" - with get_plugin(Iterative) as (plugin, control): - _test_practice(plugin.get_control(), plugin.get_fake_time(), plugin.get_robot()) - - robot = plugin.get_robot() - assert robot.did_robot_init == True - assert robot.did_disabled_init == True - assert robot.did_disabled_periodic == True - assert robot.did_auto_init == True - assert robot.did_auto_periodic == True - assert robot.did_teleop_init == True - assert robot.did_teleop_periodic == True - - -class Sample(wpilib.SampleRobot): - def robotInit(self): - self.did_robot_init = True - - def disabled(self): - self.did_robot_disabled = True - - def autonomous(self): - self.did_autonomous = True - - def operatorControl(self): - self.did_operator = True - - -def test_sample(): - """Ensure that all states of the sample robot run""" - with get_plugin(Sample) as (plugin, control): - _test_practice(plugin.get_control(), plugin.get_fake_time(), plugin.get_robot()) - - robot = plugin.get_robot() - assert robot.did_robot_init == True - assert robot.did_robot_disabled == True - assert robot.did_autonomous == True - assert robot.did_operator == True diff --git a/tests/test_pytest_plugins.py b/tests/test_pytest_plugins.py new file mode 100644 index 00000000..730f1772 --- /dev/null +++ b/tests/test_pytest_plugins.py @@ -0,0 +1,421 @@ +import os +import pathlib +import sys + +import pytest + + +def _make_robot_module(pytester): + pytester.makepyfile( + robot_module=""" +import wpilib + + +class DummyRobot(wpilib.TimedRobot): + def __init__(self): + super().__init__() + self.did_init = True + + def robotInit(self): + self.did_init = True + + +class RobotInitFailed(wpilib.TimedRobot): + def robotInit(self): + assert False + + +class AutonomousPeriodicFailed(wpilib.TimedRobot): + def autonomousPeriodic(self): + assert False + + +class TeleopPeriodicFailed(wpilib.TimedRobot): + def teleopPeriodic(self): + assert False + + +class TeleopInitFailed(wpilib.TimedRobot): + def teleopInit(self): + assert False + + +class IterativeStateRobot(wpilib.TimedRobot): + def robotInit(self): + self.did_robot_init = True + + def disabledInit(self): + self.did_disabled_init = True + + def disabledPeriodic(self): + self.did_disabled_periodic = True + + def autonomousInit(self): + self.did_auto_init = True + + def autonomousPeriodic(self): + self.did_auto_periodic = True + + def teleopInit(self): + self.did_teleop_init = True + + def teleopPeriodic(self): + self.did_teleop_periodic = True + +""" + ) + + +def _configure_pyfrc_plugin(pytester, robot_class="DummyRobot"): + pytester.makeconftest( + f""" +import pathlib + +from pyfrc.test_support.pytest_plugin import PyFrcPlugin + +from robot_module import {robot_class} + + +def pytest_configure(config): + robot_file = pathlib.Path(__file__).resolve() + config.pluginmanager.register(PyFrcPlugin({robot_class}, robot_file, False)) +""" + ) + + +def _configure_isolated_plugin(pytester, parallelism=1, robot_class="DummyRobot"): + pytester.makeconftest( + f""" +import pathlib + +from pyfrc.test_support.pytest_isolated_tests_plugin import IsolatedTestsPlugin + +from robot_module import {robot_class} + +def pytest_configure(config): + if "--no-header" in config.invocation_params.args: + return + robot_file = pathlib.Path(__file__).resolve() + config.pluginmanager.register( + IsolatedTestsPlugin({robot_class}, robot_file, False, False, {parallelism}) + ) +""" + ) + + +def test_pyfrc_plugin_success(pytester): + _make_robot_module(pytester) + _configure_pyfrc_plugin(pytester) + pytester.makepyfile( + test_success=""" +def test_robot_fixture(robot): + assert robot.did_init +""" + ) + + result = pytester.runpytest("-vv") + + result.assert_outcomes(passed=1) + + +def test_pyfrc_plugin_failure_shows_output(pytester): + _make_robot_module(pytester) + _configure_pyfrc_plugin(pytester) + pytester.makepyfile( + test_failure=""" +def test_robot_failure(robot): + print("pyfrc failure output") + assert False +""" + ) + + result = pytester.runpytest("-vv") + + result.assert_outcomes(failed=1) + result.stdout.fnmatch_lines( + [ + "*test_failure.py::test_robot_failure FAILED*", + "*pyfrc failure output*", + ] + ) + + +def test_isolated_plugin_process_and_output(pytester): + _make_robot_module(pytester) + _configure_isolated_plugin(pytester) + pytester.makepyfile( + test_isolated=""" +import os + + +def test_non_robot_pid(): + with open("non_robot_pid.txt", "w") as fp: + fp.write(str(os.getpid())) + + +def test_robot_pid_one(robot): + with open("robot_pid_one.txt", "w") as fp: + fp.write(str(os.getpid())) + + +def test_robot_pid_two(robot): + with open("robot_pid_two.txt", "w") as fp: + fp.write(str(os.getpid())) + + +def test_robot_failure_output(robot): + print("isolated failure output") + assert False +""" + ) + + result = pytester.runpytest_subprocess("-vv") + + result.assert_outcomes(passed=3, failed=1) + result.stdout.fnmatch_lines( + [ + "*test_isolated.py::test_robot_failure_output FAILED*", + "*isolated failure output*", + ] + ) + + root = pathlib.Path(pytester.path) + main_pid = int(root.joinpath("non_robot_pid.txt").read_text()) + robot_pid_one = int(root.joinpath("robot_pid_one.txt").read_text()) + robot_pid_two = int(root.joinpath("robot_pid_two.txt").read_text()) + + assert robot_pid_one != main_pid + assert robot_pid_two != main_pid + assert robot_pid_one != robot_pid_two + + +def test_isolated_plugin_no_duplicate_verbose_output(pytester): + _make_robot_module(pytester) + _configure_isolated_plugin(pytester) + pytester.makepyfile( + test_isolated=""" +def test_non_robot(): + assert True + + +def test_robot_one(robot): + assert robot is not None + + +def test_robot_two(robot): + assert robot is not None +""" + ) + + result = pytester.runpytest_subprocess("-v") + + result.assert_outcomes(passed=3) + assert ( + sum(1 for line in result.outlines if "test_isolated.py::test_robot_one" in line) + == 1 + ) + assert ( + sum(1 for line in result.outlines if "test_isolated.py::test_robot_two" in line) + == 1 + ) + + +@pytest.mark.skipif( + sys.platform.startswith("win"), + reason="Process signal exits do not work on Windows", +) +def test_isolated_plugin_reports_signal_exit(pytester): + _make_robot_module(pytester) + _configure_isolated_plugin(pytester) + pytester.makepyfile( + test_isolated=""" +import os +import signal + + +def test_robot_signal_exit(robot): + os.kill(os.getpid(), signal.SIGTERM) +""" + ) + + result = pytester.runpytest_subprocess("-vv") + + result.assert_outcomes(failed=1) + result.stdout.fnmatch_lines( + [ + "*test_isolated.py::test_robot_signal_exit FAILED*", + "*Terminated*", + ] + ) + + +def test_isolated_plugin_shows_file_in_non_verbose_output(pytester): + _make_robot_module(pytester) + _configure_isolated_plugin(pytester) + pytester.makepyfile( + test_isolated=""" +def test_non_robot(): + assert True + + +def test_robot_one(robot): + assert robot is not None + + +def test_robot_two(robot): + assert robot is not None +""" + ) + + result = pytester.runpytest_subprocess() + + result.assert_outcomes(passed=3) + assert ( + sum(1 for line in result.outlines if line.startswith("test_isolated.py")) == 1 + ) + + +def test_isolated_plugin_maxfail_stops_early(pytester): + _make_robot_module(pytester) + _configure_isolated_plugin(pytester) + pytester.makepyfile( + test_isolated=""" +def test_robot_first(robot): + assert False + + +def test_robot_second(robot): + assert False +""" + ) + + result = pytester.runpytest_subprocess("-v", "-x") + + result.assert_outcomes(failed=1) + assert not any("test_robot_second" in line for line in result.outlines) + + +@pytest.mark.parametrize("isolated", [False, True]) +def test_builtin_pyfrc_tests_module(pytester, isolated): + _make_robot_module(pytester) + if isolated: + _configure_isolated_plugin(pytester, robot_class="DummyRobot") + else: + _configure_pyfrc_plugin(pytester, robot_class="DummyRobot") + pytester.makepyfile(pyfrc_test="from pyfrc.tests import *\n") + + result = pytester.runpytest_subprocess("-q") + + result.assert_outcomes(passed=4) + + +def _run_robot_suite(pytester, isolated, robot_class, test_source, *args): + _make_robot_module(pytester) + if isolated: + _configure_isolated_plugin(pytester, robot_class=robot_class) + else: + _configure_pyfrc_plugin(pytester, robot_class=robot_class) + pytester.makepyfile(test_robot=test_source) + return pytester.runpytest_subprocess(*args) + + +_ROBOT_INIT_FAILURES = [ + "RobotInitFailed", +] + +_AUTO_FAILURES = [ + "AutonomousPeriodicFailed", +] + +_TELEOP_FAILURES = [ + "TeleopPeriodicFailed", + "TeleopInitFailed", +] + + +@pytest.mark.parametrize("isolated", [False, True]) +@pytest.mark.parametrize("robot_class", _ROBOT_INIT_FAILURES) +def test_robot_init_failure_detection(pytester, isolated, robot_class): + result = _run_robot_suite( + pytester, + isolated, + robot_class, + """ +def test_robot_init_failure(robot, control): + with control.run_robot(): + pass +""", + "-vv", + ) + + result.assert_outcomes(failed=1) + + +@pytest.mark.parametrize("isolated", [False, True]) +@pytest.mark.parametrize("robot_class", _AUTO_FAILURES) +def test_autonomous_failure_detection(pytester, isolated, robot_class): + result = _run_robot_suite( + pytester, + isolated, + robot_class, + """ +def test_autonomous_failure(robot, control): + with control.run_robot(): + control.step_timing(seconds=0.4, autonomous=True, enabled=True) +""", + "-vv", + ) + + result.assert_outcomes(failed=1) + + +@pytest.mark.parametrize("isolated", [False, True]) +@pytest.mark.parametrize("robot_class", _TELEOP_FAILURES) +def test_teleop_failure_detection(pytester, isolated, robot_class): + result = _run_robot_suite( + pytester, + isolated, + robot_class, + """ +def test_teleop_failure(robot, control): + with control.run_robot(): + control.step_timing(seconds=0.4, autonomous=False, enabled=True) +""", + "-vv", + ) + + result.assert_outcomes(failed=1) + + +@pytest.mark.parametrize("isolated", [False, True]) +@pytest.mark.parametrize("robot_class", ["IterativeStateRobot"]) +def test_robot_state_transitions(pytester, isolated, robot_class): + expected = { + "IterativeStateRobot": [ + "did_robot_init", + "did_disabled_init", + "did_disabled_periodic", + "did_auto_init", + "did_auto_periodic", + "did_teleop_init", + "did_teleop_periodic", + ], + }[robot_class] + + result = _run_robot_suite( + pytester, + isolated, + robot_class, + f""" +def test_state_transitions(robot, control): + with control.run_robot(): + control.step_timing(seconds=0.4, autonomous=False, enabled=False) + control.step_timing(seconds=0.4, autonomous=True, enabled=True) + control.step_timing(seconds=0.4, autonomous=False, enabled=True) + for name in {expected!r}: + assert getattr(robot, name, False) +""", + "-vv", + ) + + result.assert_outcomes(passed=1)