diff --git a/src/ess/livedata/config/instruments/_ess.py b/src/ess/livedata/config/instruments/_ess.py index ab0e04ecf..55ec36b40 100644 --- a/src/ess/livedata/config/instruments/_ess.py +++ b/src/ess/livedata/config/instruments/_ess.py @@ -40,6 +40,12 @@ def _make_dev_area_detectors( } +def _make_dev_logs(*, instrument: str, log_names: list[str]) -> StreamLUT: + """Create log stream mapping for dev mode where source_name equals internal name.""" + topic = f'{instrument}_motion' + return {InputStreamKey(topic=topic, source_name=name): name for name in log_names} + + def _make_dev_beam_monitors( instrument: str, monitor_names: list[str] | None = None ) -> StreamLUT: @@ -80,9 +86,8 @@ def make_dev_stream_mapping( detector_names: list[str], area_detector_names: list[str] | None = None, monitor_names: list[str] | None = None, + log_names: list[str] | None = None, ) -> StreamMapping: - motion_topic = f'{instrument}_motion' - log_topics = {motion_topic} area_detectors = ( _make_dev_area_detectors( instrument=instrument, area_detectors=area_detector_names @@ -90,12 +95,17 @@ def make_dev_stream_mapping( if area_detector_names else {} ) + logs = ( + _make_dev_logs(instrument=instrument, log_names=log_names) + if log_names + else None + ) return StreamMapping( instrument=instrument, detectors=_make_dev_detectors(instrument=instrument, detectors=detector_names), monitors=_make_dev_beam_monitors(instrument, monitor_names=monitor_names), area_detectors=area_detectors, - log_topics=log_topics, + logs=logs, **_make_livedata_topics(instrument), ) @@ -106,6 +116,5 @@ def make_common_stream_mapping_inputs( return { 'instrument': instrument, 'monitors': _make_cbm_monitors(instrument, monitor_names=monitor_names), - 'log_topics': None, **_make_livedata_topics(instrument), } diff --git a/src/ess/livedata/config/instruments/bifrost/specs.py b/src/ess/livedata/config/instruments/bifrost/specs.py index b8a3ebb61..7bb0ca686 100644 --- a/src/ess/livedata/config/instruments/bifrost/specs.py +++ b/src/ess/livedata/config/instruments/bifrost/specs.py @@ -231,11 +231,137 @@ class QMapOutputs(WorkflowOutputsBase): '113_psd1', ] -# Some example motions used for testing, probably not reflecting reality +# Combined f144 log stream configuration. +# Maps internal name -> {source: Kafka source name, units: unit string, topic: topic} +# Generated using: python -m ess.livedata.nexus_helpers --generate --topic +f144_log_streams: dict[str, dict[str, str]] = { + # Motion streams (topic: bifrost_motion) + 'attenuator_1': { + 'source': 'BIFRO-AttChg:MC-Pne-01:ShtAuxBits07', + 'units': 'dimensionless', + 'topic': 'bifrost_motion', + }, + 'attenuator_2': { + 'source': 'BIFRO-AttChg:MC-Pne-02:ShtAuxBits07', + 'units': 'dimensionless', + 'topic': 'bifrost_motion', + }, + 'attenuator_3': { + 'source': 'BIFRO-AttChg:MC-Pne-03:ShtAuxBits07', + 'units': 'dimensionless', + 'topic': 'bifrost_motion', + }, + 'detector_rotation': { + 'source': 'BIFRO-DtCar:MC-RotZ-01:Mtr.RBV', + 'units': 'deg', + 'topic': 'bifrost_motion', + }, + 'get_lost_tube': { + 'source': 'BIFRO-InBm:MC-Pne-01:ShtAuxBits07', + 'units': 'dimensionless', + 'topic': 'bifrost_motion', + }, + 'goniometer_x': { + 'source': 'BIFRO-SpGon:MC-RotX-01:Mtr.RBV', + 'units': 'deg', + 'topic': 'bifrost_motion', + }, + 'goniometer_y': { + 'source': 'BIFRO-SpGon:MC-RotY-01:Mtr.RBV', + 'units': 'deg', + 'topic': 'bifrost_motion', + }, + 'sample_rotation': { + 'source': 'BIFRO-SpRot:MC-RotZ-01:Mtr.RBV', + 'units': 'deg', + 'topic': 'bifrost_motion', + }, + 'slit_bottom': { + 'source': 'BIFRO-SpSl1:MC-SlZm-01:PzMtr.RBV', + 'units': 'mm', + 'topic': 'bifrost_motion', + }, + 'slit_left': { + 'source': 'BIFRO-SpSl1:MC-SlYp-01:PzMtr.RBV', + 'units': 'mm', + 'topic': 'bifrost_motion', + }, + 'slit_right': { + 'source': 'BIFRO-SpSl1:MC-SlYm-01:PzMtr.RBV', + 'units': 'mm', + 'topic': 'bifrost_motion', + }, + 'slit_top': { + 'source': 'BIFRO-SpSl1:MC-SlZp-01:PzMtr.RBV', + 'units': 'mm', + 'topic': 'bifrost_motion', + }, + 'slit_position': { + 'source': 'BIFRO-SpSl1:MC-LinX-01:PzMtr-PosReadback', + 'units': 'mm', + 'topic': 'bifrost_motion', + }, + # Sample environment streams (topic: bifrost_sample_env) + 'heater_1': { + 'source': 'YMIR-SEE:SE-LS336-004:HTR1', + 'units': 'W', + 'topic': 'bifrost_sample_env', + }, + 'heater_2': { + 'source': 'YMIR-SEE:SE-LS336-004:HTR2', + 'units': 'W', + 'topic': 'bifrost_sample_env', + }, + 'temperature_0': { + 'source': 'YMIR-SEE:SE-LS336-004:KRDG0', + 'units': 'K', + 'topic': 'bifrost_sample_env', + }, + 'temperature_1': { + 'source': 'YMIR-SEE:SE-LS336-004:KRDG1', + 'units': 'K', + 'topic': 'bifrost_sample_env', + }, + 'temperature_2': { + 'source': 'YMIR-SEE:SE-LS336-004:KRDG2', + 'units': 'K', + 'topic': 'bifrost_sample_env', + }, + 'temperature_3': { + 'source': 'YMIR-SEE:SE-LS336-004:KRDG3', + 'units': 'K', + 'topic': 'bifrost_sample_env', + }, + 'temperature_setpoint': { + 'source': 'YMIR-SEE:SE-LS336-004:SETP_S1', + 'units': 'K', + 'topic': 'bifrost_sample_env', + }, + 'sensor_0': { + 'source': 'YMIR-SEE:SE-LS336-004:SRDG0', + 'units': 'V', + 'topic': 'bifrost_sample_env', + }, + 'sensor_1': { + 'source': 'YMIR-SEE:SE-LS336-004:SRDG1', + 'units': 'V', + 'topic': 'bifrost_sample_env', + }, + 'sensor_2': { + 'source': 'YMIR-SEE:SE-LS336-004:SRDG2', + 'units': 'V', + 'topic': 'bifrost_sample_env', + }, + 'sensor_3': { + 'source': 'YMIR-SEE:SE-LS336-004:SRDG3', + 'units': 'V', + 'topic': 'bifrost_sample_env', + }, +} + +# Derived from f144_log_streams for use by the Instrument f144_attribute_registry = { - 'detector_rotation': {'units': 'deg'}, - 'sample_rotation': {'units': 'deg'}, - 'sample_temperature': {'units': 'K'}, + name: {'units': info['units']} for name, info in f144_log_streams.items() } # Create instrument diff --git a/src/ess/livedata/config/instruments/bifrost/streams.py b/src/ess/livedata/config/instruments/bifrost/streams.py index f457b4545..07b2cc176 100644 --- a/src/ess/livedata/config/instruments/bifrost/streams.py +++ b/src/ess/livedata/config/instruments/bifrost/streams.py @@ -12,7 +12,7 @@ from ess.livedata.kafka import InputStreamKey, StreamLUT, StreamMapping from .._ess import make_common_stream_mapping_inputs, make_dev_stream_mapping -from .specs import monitors +from .specs import f144_log_streams, monitors def _bifrost_generator() -> Generator[tuple[str, tuple[int, int]]]: @@ -61,16 +61,31 @@ def _make_bifrost_detectors() -> StreamLUT: } +def _make_bifrost_logs() -> StreamLUT: + """ + Bifrost log data mapping. + + Derives StreamLUT from f144_log_streams, mapping Kafka source names + (EPICS PV names) to ESSlivedata-internal stream names. + """ + return { + InputStreamKey(topic=info['topic'], source_name=info['source']): internal_name + for internal_name, info in f144_log_streams.items() + } + + stream_mapping = { StreamingEnv.DEV: make_dev_stream_mapping( 'bifrost', detector_names=list(detector_fakes), monitor_names=monitors, + log_names=list(f144_log_streams.keys()), ), StreamingEnv.PROD: StreamMapping( **make_common_stream_mapping_inputs( instrument='bifrost', monitor_names=monitors ), detectors=_make_bifrost_detectors(), + logs=_make_bifrost_logs(), ), } diff --git a/src/ess/livedata/config/instruments/dummy/streams.py b/src/ess/livedata/config/instruments/dummy/streams.py index b2d811438..08037976c 100644 --- a/src/ess/livedata/config/instruments/dummy/streams.py +++ b/src/ess/livedata/config/instruments/dummy/streams.py @@ -6,6 +6,7 @@ from ess.livedata.kafka import InputStreamKey, StreamLUT, StreamMapping from .._ess import make_common_stream_mapping_inputs, make_dev_stream_mapping +from .specs import instrument detector_fakes = {'panel_0': (1, 128**2)} @@ -34,6 +35,7 @@ def _make_dummy_area_detectors() -> StreamLUT: 'dummy', detector_names=list(detector_fakes), area_detector_names=list(area_detector_fakes), + log_names=list(instrument.f144_attribute_registry.keys()), ), StreamingEnv.PROD: StreamMapping( **make_common_stream_mapping_inputs(instrument='dummy'), diff --git a/src/ess/livedata/kafka/stream_mapping.py b/src/ess/livedata/kafka/stream_mapping.py index e9b6be2bc..cd3fe2091 100644 --- a/src/ess/livedata/kafka/stream_mapping.py +++ b/src/ess/livedata/kafka/stream_mapping.py @@ -32,7 +32,7 @@ def __init__( detectors: StreamLUT, monitors: StreamLUT, area_detectors: StreamLUT | None = None, - log_topics: set[KafkaTopic] | None = None, + logs: StreamLUT | None = None, livedata_commands_topic: str, livedata_data_topic: str, livedata_responses_topic: str, @@ -43,9 +43,7 @@ def __init__( self._detectors = detectors self._monitors = monitors self._area_detectors = area_detectors or {} - # Currently we simply reuse the source_name as the stream name - self._logs = None - self._log_topics = log_topics or set() + self._logs = logs self._livedata_commands_topic = livedata_commands_topic self._livedata_data_topic = livedata_data_topic self._livedata_responses_topic = livedata_responses_topic @@ -94,8 +92,10 @@ def monitor_topics(self) -> set[KafkaTopic]: @property def log_topics(self) -> set[KafkaTopic]: - """Returns the list of log topics.""" - return self._log_topics + """Returns the set of log topics.""" + if self._logs is None: + return set() + return {stream.topic for stream in self._logs.keys()} @property def detectors(self) -> StreamLUT: diff --git a/src/ess/livedata/nexus_helpers.py b/src/ess/livedata/nexus_helpers.py index 10c9b2714..2d5de02b8 100644 --- a/src/ess/livedata/nexus_helpers.py +++ b/src/ess/livedata/nexus_helpers.py @@ -7,6 +7,7 @@ from __future__ import annotations +import re from dataclasses import dataclass from pathlib import Path @@ -31,6 +32,8 @@ class StreamInfo: NeXus class of the parent group (e.g., 'NXdetector', 'NXdisk_chopper'). writer_module: FileWriter module used to write this data (e.g., 'ev44', 'f144', 'tdct'). + units: + Units string from the 'units' attribute, if present. """ group_path: str @@ -39,6 +42,7 @@ class StreamInfo: nx_class: str parent_nx_class: str writer_module: str + units: str = '' def _decode_attr(value) -> str: @@ -89,6 +93,12 @@ def _collect_stream_nodes(name: str, node) -> None: if node.parent is not None and 'NX_class' in node.parent.attrs: parent_nx_class = _decode_attr(node.parent.attrs['NX_class']) + # Try to get units from 'value' dataset within the group (common for f144) + units = '' + if 'value' in node and isinstance(node['value'], h5py.Dataset): + if 'units' in node['value'].attrs: + units = _decode_attr(node['value'].attrs['units']) + stream_infos.append( StreamInfo( group_path=name, @@ -97,6 +107,7 @@ def _collect_stream_nodes(name: str, node) -> None: nx_class=nx_class, parent_nx_class=parent_nx_class, writer_module=writer_module, + units=units, ) ) @@ -110,19 +121,166 @@ def _collect_stream_nodes(name: str, node) -> None: return stream_infos +def suggest_internal_name(info: StreamInfo) -> str: + """Suggest an internal name based on the group path. + + Uses the parent group name (last path component before 'value', 'idle_flag', etc.) + as the basis for the internal name. + """ + parts = info.group_path.split('/') + # For paths like '.../rotation_stage/value', use 'rotation_stage' + for i, part in enumerate(parts): + if part in ('value', 'idle_flag', 'target_value'): + if i > 0: + return parts[i - 1] + # Fallback: use last non-value component + return parts[-2] if len(parts) >= 2 else parts[-1] + + +def filter_f144_streams( + infos: list[StreamInfo], + *, + topic_filter: str | None = None, + exclude_patterns: list[str] | None = None, +) -> list[StreamInfo]: + """Filter stream infos to only f144 writer module entries. + + Parameters + ---------- + infos: + List of StreamInfo objects to filter. + topic_filter: + If provided, only include streams from this topic. + exclude_patterns: + List of regex patterns to exclude from results (matched against group_path). + """ + exclude_patterns = exclude_patterns or [] + exclude_regexes = [re.compile(p) for p in exclude_patterns] + + result = [] + for info in infos: + if info.writer_module != 'f144': + continue + if topic_filter and info.topic != topic_filter: + continue + if any(regex.search(info.group_path) for regex in exclude_regexes): + continue + result.append(info) + return result + + +def generate_f144_log_streams_code( + infos: list[StreamInfo], + *, + topic: str, + variable_name: str = 'f144_log_streams', +) -> str: + """Generate Python code for f144_log_streams dictionary. + + The generated dictionary maps internal names to source, units, and topic info, + which can be used to derive both f144_attribute_registry and StreamLUT. + + Parameters + ---------- + infos: + List of StreamInfo objects (should be pre-filtered to f144 only). + topic: + The Kafka topic these streams come from. + variable_name: + Name for the generated variable. + """ + # Group by suggested internal name, preferring 'value' entries + by_name: dict[str, StreamInfo] = {} + for info in infos: + if info.topic != topic: + continue + name = suggest_internal_name(info) + # Prefer 'value' entries over 'idle_flag' or 'target_value' + if name not in by_name or info.group_path.endswith('/value'): + by_name[name] = info + + lines = [ + "# Generated from NeXus file - review and adjust names as needed", + f"# Topic: {topic}", + f"{variable_name}: dict[str, dict[str, str]] = {{", + ] + + for name in sorted(by_name.keys()): + info = by_name[name] + units = info.units or 'dimensionless' + entry = ( + f" '{name}': {{'source': '{info.source}', 'units': '{units}', " + f"'topic': '{topic}'}}" + ) + lines.append(f"{entry},") + + lines.append("}") + return '\n'.join(lines) + + if __name__ == '__main__': - # Example usage + import argparse import sys - if len(sys.argv) < 2: - sys.stderr.write( - "Usage: python -m ess.livedata.nexus_helpers \n" - ) - sys.exit(1) + parser = argparse.ArgumentParser( + description='Extract streaming metadata from NeXus files', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # List all streaming groups + python -m ess.livedata.nexus_helpers file.hdf - file_path = sys.argv[1] - infos = extract_stream_info(file_path) + # List only f144 streams + python -m ess.livedata.nexus_helpers file.hdf --f144 - sys.stdout.write(f"Found {len(infos)} streaming data groups\n\n") - for info in infos: - sys.stdout.write(f"{info}\n") + # Generate code for motion topic + python -m ess.livedata.nexus_helpers file.hdf --generate --topic bifrost_motion + + # Filter by topic and exclude choppers + python -m ess.livedata.nexus_helpers file.hdf --f144 --topic bifrost_motion \\ + --exclude "Chopper" +""", + ) + parser.add_argument('nexus_file', help='Path to NeXus/HDF5 file') + parser.add_argument( + '--f144', action='store_true', help='Only show f144 (log data) streams' + ) + parser.add_argument('--topic', help='Filter by Kafka topic') + parser.add_argument( + '--exclude', + action='append', + default=[], + help='Regex pattern to exclude (can be used multiple times)', + ) + parser.add_argument( + '--generate', + action='store_true', + help='Generate Python code for f144_log_streams dict', + ) + parser.add_argument( + '--var-name', + default='f144_log_streams', + help='Variable name for generated code (default: f144_log_streams)', + ) + + args = parser.parse_args() + + infos = extract_stream_info(args.nexus_file) + + if args.f144 or args.generate: + infos = filter_f144_streams( + infos, topic_filter=args.topic, exclude_patterns=args.exclude + ) + + if args.generate: + if not args.topic: + sys.stderr.write("Error: --generate requires --topic\n") + sys.exit(1) + code = generate_f144_log_streams_code( + infos, topic=args.topic, variable_name=args.var_name + ) + sys.stdout.write(code + '\n') + else: + sys.stdout.write(f"Found {len(infos)} streaming data groups\n\n") + for info in infos: + sys.stdout.write(f"{info}\n") diff --git a/tests/config/streams_test.py b/tests/config/streams_test.py index 9428dc30a..0581012c6 100644 --- a/tests/config/streams_test.py +++ b/tests/config/streams_test.py @@ -4,6 +4,7 @@ from ess.livedata.config import streams from ess.livedata.config.instruments import available_instruments +from ess.livedata.kafka import InputStreamKey, StreamMapping @pytest.mark.parametrize('instrument', available_instruments()) @@ -18,3 +19,40 @@ def test_get_stream_mapping_production(instrument: str) -> None: stream_mapping = streams.get_stream_mapping(instrument=instrument, dev=False) assert stream_mapping is not None assert isinstance(stream_mapping, streams.StreamMapping) + + +class TestStreamMappingLogTopics: + def test_log_topics_returns_empty_set_when_logs_is_none(self) -> None: + mapping = StreamMapping( + instrument='test', + detectors={}, + monitors={}, + logs=None, + livedata_commands_topic='test_commands', + livedata_data_topic='test_data', + livedata_responses_topic='test_responses', + livedata_roi_topic='test_roi', + livedata_status_topic='test_status', + ) + assert mapping.log_topics == set() + assert mapping.logs is None + + def test_log_topics_returns_topics_from_logs_lut(self) -> None: + logs = { + InputStreamKey(topic='motion', source_name='motor1'): 'detector_rotation', + InputStreamKey(topic='motion', source_name='motor2'): 'sample_rotation', + InputStreamKey(topic='sensors', source_name='temp1'): 'sample_temperature', + } + mapping = StreamMapping( + instrument='test', + detectors={}, + monitors={}, + logs=logs, + livedata_commands_topic='test_commands', + livedata_data_topic='test_data', + livedata_responses_topic='test_responses', + livedata_roi_topic='test_roi', + livedata_status_topic='test_status', + ) + assert mapping.log_topics == {'motion', 'sensors'} + assert mapping.logs == logs diff --git a/tests/nexus_helpers_test.py b/tests/nexus_helpers_test.py index b944f6e60..9c44c4f1d 100644 --- a/tests/nexus_helpers_test.py +++ b/tests/nexus_helpers_test.py @@ -7,7 +7,14 @@ import h5py import pytest -from ess.livedata.nexus_helpers import StreamInfo, _decode_attr, extract_stream_info +from ess.livedata.nexus_helpers import ( + StreamInfo, + _decode_attr, + extract_stream_info, + filter_f144_streams, + generate_f144_log_streams_code, + suggest_internal_name, +) class TestDecodeAttr: @@ -369,3 +376,199 @@ def test_dataclass_inequality(self) -> None: ) assert info1 != info2 + + def test_units_default_is_empty_string(self) -> None: + info = StreamInfo( + group_path='entry/detector/events', + topic='detector_events', + source='detector_1', + nx_class='NXlog', + parent_nx_class='NXdetector', + writer_module='f144', + ) + assert info.units == '' + + def test_units_can_be_set(self) -> None: + info = StreamInfo( + group_path='entry/motor/value', + topic='motion', + source='motor_1', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + units='degrees', + ) + assert info.units == 'degrees' + + +class TestSuggestInternalName: + def test_extracts_name_before_value(self) -> None: + info = StreamInfo( + group_path='entry/instrument/rotation_stage/value', + topic='motion', + source='motor_1', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + ) + assert suggest_internal_name(info) == 'rotation_stage' + + def test_extracts_name_before_idle_flag(self) -> None: + info = StreamInfo( + group_path='entry/instrument/rotation_stage/idle_flag', + topic='motion', + source='motor_1', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + ) + assert suggest_internal_name(info) == 'rotation_stage' + + def test_preserves_r0_suffix(self) -> None: + info = StreamInfo( + group_path='entry/instrument/detector_tank_angle_r0/value', + topic='motion', + source='motor_1', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + ) + assert suggest_internal_name(info) == 'detector_tank_angle_r0' + + def test_preserves_t0_suffix(self) -> None: + info = StreamInfo( + group_path='entry/instrument/sample_stage_t0/value', + topic='motion', + source='motor_1', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + ) + assert suggest_internal_name(info) == 'sample_stage_t0' + + +class TestFilterF144Streams: + @pytest.fixture + def mixed_streams(self) -> list[StreamInfo]: + return [ + StreamInfo( + group_path='entry/detector/events', + topic='detector', + source='det_1', + nx_class='NXevent_data', + parent_nx_class='NXdetector', + writer_module='ev44', + ), + StreamInfo( + group_path='entry/motor/value', + topic='motion', + source='motor_1', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + units='degrees', + ), + StreamInfo( + group_path='entry/chopper/rotation', + topic='choppers', + source='chopper_1', + nx_class='NXlog', + parent_nx_class='NXdisk_chopper', + writer_module='f144', + units='Hz', + ), + ] + + def test_filters_to_f144_only(self, mixed_streams) -> None: + result = filter_f144_streams(mixed_streams) + assert len(result) == 2 + assert all(info.writer_module == 'f144' for info in result) + + def test_filters_by_topic(self, mixed_streams) -> None: + result = filter_f144_streams(mixed_streams, topic_filter='motion') + assert len(result) == 1 + assert result[0].source == 'motor_1' + + def test_excludes_by_pattern(self, mixed_streams) -> None: + result = filter_f144_streams(mixed_streams, exclude_patterns=['chopper']) + assert len(result) == 1 + assert result[0].source == 'motor_1' + + +class TestGenerateF144LogStreamsCode: + def test_generates_valid_python_dict(self) -> None: + infos = [ + StreamInfo( + group_path='entry/motor/value', + topic='motion', + source='MOTOR:PV:RBV', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + units='degrees', + ), + ] + code = generate_f144_log_streams_code(infos, topic='motion') + assert "f144_log_streams: dict[str, dict[str, str]] = {" in code + assert ( + "'motor': {'source': 'MOTOR:PV:RBV', 'units': 'degrees', 'topic': 'motion'}" + in code + ) + + def test_uses_dimensionless_for_empty_units(self) -> None: + infos = [ + StreamInfo( + group_path='entry/switch/value', + topic='motion', + source='SWITCH:PV', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + units='', + ), + ] + code = generate_f144_log_streams_code(infos, topic='motion') + assert "'units': 'dimensionless', 'topic': 'motion'" in code + + def test_prefers_value_over_idle_flag(self) -> None: + infos = [ + StreamInfo( + group_path='entry/motor/idle_flag', + topic='motion', + source='MOTOR:DMOV', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + units='', + ), + StreamInfo( + group_path='entry/motor/value', + topic='motion', + source='MOTOR:RBV', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + units='degrees', + ), + ] + code = generate_f144_log_streams_code(infos, topic='motion') + # Should use 'value' entry, not 'idle_flag' + assert 'MOTOR:RBV' in code + assert 'MOTOR:DMOV' not in code + + def test_custom_variable_name(self) -> None: + infos = [ + StreamInfo( + group_path='entry/motor/value', + topic='motion', + source='MOTOR:RBV', + nx_class='NXlog', + parent_nx_class='NXpositioner', + writer_module='f144', + units='deg', + ), + ] + code = generate_f144_log_streams_code( + infos, topic='motion', variable_name='my_streams' + ) + assert 'my_streams: dict[str, dict[str, str]] = {' in code