diff --git a/tests/component/test_stream_readers_di.py b/tests/component/test_stream_readers_di.py index ee84de3d..007e7836 100644 --- a/tests/component/test_stream_readers_di.py +++ b/tests/component/test_stream_readers_di.py @@ -9,10 +9,7 @@ import nidaqmx import nidaqmx.system from nidaqmx.constants import LineGrouping -from nidaqmx.stream_readers import ( - DigitalMultiChannelReader, - DigitalSingleChannelReader, -) +from nidaqmx.stream_readers import DigitalMultiChannelReader, DigitalSingleChannelReader from nidaqmx.utils import flatten_channel_string @@ -400,6 +397,25 @@ def test___digital_multi_channel_reader___read_one_sample_multi_line___returns_v ) +def test___digital_multi_channel_reader___read_one_sample_multi_line_jagged___returns_valid_samples( + di_multi_channel_port_uint32_task: nidaqmx.Task, +) -> None: + reader = DigitalMultiChannelReader(di_multi_channel_port_uint32_task.in_stream) + num_channels = di_multi_channel_port_uint32_task.number_of_channels + samples_to_read = 256 + sample = numpy.full((num_channels, 32), False, dtype=numpy.bool_) + + data = [ + _read_and_copy(reader.read_one_sample_multi_line, sample) for _ in range(samples_to_read) + ] + + assert [ + [_bool_array_to_int(sample[chan, :]) for chan in range(num_channels)] for sample in data + ] == _get_expected_digital_port_data_sample_major( + di_multi_channel_port_uint32_task, samples_to_read + ) + + def test___digital_multi_channel_reader___read_one_sample_multi_line_with_wrong_dtype___raises_error_with_correct_dtype( di_multi_channel_multi_line_task: nidaqmx.Task, ) -> None: diff --git a/tests/component/test_stream_writers_do.py b/tests/component/test_stream_writers_do.py index 4a832270..a4a2d952 100644 --- a/tests/component/test_stream_writers_do.py +++ b/tests/component/test_stream_writers_do.py @@ -171,6 +171,19 @@ def di_port1_loopback_task( return task +@pytest.fixture +def di_port2_loopback_task( + generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device +) -> nidaqmx.Task: + task = generate_task() + task.di_channels.add_di_chan( + real_x_series_device.di_ports[2].name, + line_grouping=LineGrouping.CHAN_FOR_ALL_LINES, + ) + _start_di_task(task) + return task + + @pytest.fixture def di_multi_channel_port_loopback_task( generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device @@ -431,6 +444,34 @@ def test___digital_multi_channel_writer___write_one_sample_multi_line___updates_ assert di_multi_line_loopback_task.read() == datum +def test___digital_multi_channel_writer___write_one_sample_multi_line_jagged___updates_output( + di_port0_loopback_task: nidaqmx.Task, + di_port1_loopback_task: nidaqmx.Task, + di_port2_loopback_task: nidaqmx.Task, + generate_task: Callable[[], nidaqmx.Task], + real_x_series_device: nidaqmx.system.Device, +) -> None: + task = generate_task() + for port in real_x_series_device.do_ports: + task.do_channels.add_do_chan( + port.name, + line_grouping=LineGrouping.CHAN_FOR_ALL_LINES, + ) + _start_do_task(task, is_port=True, num_chans=task.number_of_channels) + writer = DigitalMultiChannelWriter(task.out_stream) + num_channels = task.number_of_channels + samples_to_write = 0xA5 + + # "sweep" up to the final value, the only one we'll validate + for datum in _get_digital_data(num_channels * 32, samples_to_write): + data_to_write = _int_to_bool_array(num_channels * 32, datum).reshape((num_channels, 32)) + writer.write_one_sample_multi_line(data_to_write) + + assert di_port0_loopback_task.read() == datum & 0xFFFFFFFF + assert di_port1_loopback_task.read() == (datum >> 32) & 0xFF + assert di_port2_loopback_task.read() == (datum >> 64) & 0xFF + + def test___digital_multi_channel_writer___write_one_sample_multi_line_with_wrong_dtype___raises_error_with_correct_dtype( do_multi_channel_multi_line_task: nidaqmx.Task, ) -> None: