diff --git a/src/ess/livedata/core/service.py b/src/ess/livedata/core/service.py index ca1b4f46d..90e432f45 100644 --- a/src/ess/livedata/core/service.py +++ b/src/ess/livedata/core/service.py @@ -176,7 +176,7 @@ def _stop_impl(self) -> None: @staticmethod def setup_arg_parser( - description: str, *, dev_flag: bool = True, dashboard_auth: bool = False + description: str, *, dev_flag: bool = True ) -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description=description, @@ -201,17 +201,6 @@ def setup_arg_parser( default='INFO', help='Set the logging level', ) - if dashboard_auth: - parser.add_argument( - '--basic-auth-password', - default=None, - help='Basic authentication password for the dashboard', - ) - parser.add_argument( - '--basic-auth-cookie-secret', - default=None, - help='Basic authentication cookie secret for the dashboard', - ) return parser @@ -229,6 +218,7 @@ def get_env_defaults( # Convert --arg-name to LIVEDATA_ARG_NAME env_name = f"{prefix}_{action.dest.upper().replace('-', '_')}" env_val = os.getenv(env_name) + # Override with environment variable if present if env_val is not None: if isinstance(default_value, bool): diff --git a/src/ess/livedata/dashboard/dashboard.py b/src/ess/livedata/dashboard/dashboard.py index 7a81d98a8..6e33a8ae9 100644 --- a/src/ess/livedata/dashboard/dashboard.py +++ b/src/ess/livedata/dashboard/dashboard.py @@ -3,41 +3,18 @@ """Common functionality for implementing dashboards.""" import logging -import threading from abc import ABC, abstractmethod from contextlib import ExitStack import panel as pn -import scipp as sc from holoviews import Dimension, streams from ess.livedata import ServiceBase -from ess.livedata.config import config_names, instrument_registry -from ess.livedata.config.config_loader import load_config -from ess.livedata.config.instruments import get_config -from ess.livedata.config.schema_registry import get_schema_registry -from ess.livedata.config.streams import get_stream_mapping, stream_kind_to_topic -from ess.livedata.config.workflow_spec import ResultKey -from ess.livedata.core.message import StreamKind -from ess.livedata.kafka import consumer as kafka_consumer -from ess.livedata.kafka.message_adapter import AdaptingMessageSource -from ess.livedata.kafka.routes import RoutingAdapterBuilder -from ess.livedata.kafka.source import BackgroundMessageSource - -from .config_service import ConfigService -from .controller_factory import ControllerFactory -from .data_service import DataService -from .job_controller import JobController -from .job_service import JobService -from .kafka_transport import KafkaTransport -from .message_bridge import BackgroundMessageBridge -from .orchestrator import Orchestrator -from .plotting_controller import PlottingController -from .schema_validator import PydanticSchemaValidator -from .stream_manager import StreamManager -from .widgets.plot_creation_widget import PlotCreationWidget -from .widgets.reduction_widget import ReductionWidget -from .workflow_controller import WorkflowController + +from .config_store import ConfigStoreManager +from .dashboard_services import DashboardServices +from .kafka_transport import DashboardKafkaTransport +from .transport import NullTransport, Transport # Global throttling for sliders, etc. pn.config.throttled = True @@ -54,6 +31,7 @@ def __init__( log_level: int = logging.INFO, dashboard_name: str, port: int = 5007, + transport: str = 'kafka', basic_auth_password: str | None = None, basic_auth_cookie_secret: str | None = None, ): @@ -62,23 +40,62 @@ def __init__( self._instrument = instrument self._port = port self._dev = dev + + # Validate basic auth configuration + if (basic_auth_password is None) != (basic_auth_cookie_secret is None): + raise ValueError( + "Both 'basic_auth_password' and 'basic_auth_cookie_secret' must be " + "provided together, or neither should be provided." + ) self._basic_auth_password = basic_auth_password self._basic_auth_cookie_secret = basic_auth_cookie_secret + self._exit_stack = ExitStack() self._exit_stack.__enter__() self._callback = None - self._setup_config_service() - self._setup_data_infrastructure(instrument=instrument, dev=dev) + + # Config store manager for file-backed persistent UI state (GUI dashboards) + config_manager = ConfigStoreManager(instrument=instrument, store_type='file') + + # Setup all dashboard services + self._services = DashboardServices( + instrument=instrument, + dev=dev, + exit_stack=self._exit_stack, + logger=self._logger, + pipe_factory=streams.Pipe, + transport=self._create_transport(transport), + config_manager=config_manager, + ) + self._logger.info("%s initialized", self.__class__.__name__) - # Global unit format. + # Global unit format Dimension.unit_format = ' [{unit}]' - # Load the module to register the instrument's workflows. - self._instrument_module = get_config(instrument) - self._processor_factory = instrument_registry[self._instrument].workflow_factory - self._setup_workflow_management() + def _create_transport(self, transport: str) -> Transport: + """ + Create transport instance based on transport type. + + Parameters + ---------- + transport: + Transport type ('kafka' or 'none') + + Returns + ------- + : + Transport instance + """ + if transport == 'kafka': + return DashboardKafkaTransport( + instrument=self._instrument, dev=self._dev, logger=self._logger + ) + elif transport == 'none': + return NullTransport() + else: + raise ValueError(f"Unknown transport type: {transport}") @abstractmethod def create_sidebar_content(self) -> pn.viewable.Viewable: @@ -88,105 +105,6 @@ def create_sidebar_content(self) -> pn.viewable.Viewable: @abstractmethod def create_main_content(self) -> pn.viewable.Viewable: """Override this method to create the main dashboard content.""" - # Currently unused, should this allow for defining a custom layout where plots - # should be placed? - - def _setup_config_service(self) -> None: - """Set up configuration service with Kafka bridge.""" - kafka_downstream_config = load_config(namespace=config_names.kafka_downstream) - _, consumer = self._exit_stack.enter_context( - kafka_consumer.make_control_consumer(instrument=self._instrument) - ) - kafka_transport = KafkaTransport( - kafka_config=kafka_downstream_config, consumer=consumer, logger=self._logger - ) - self._kafka_bridge = BackgroundMessageBridge( - transport=kafka_transport, logger=self._logger - ) - self._config_service = ConfigService( - message_bridge=self._kafka_bridge, - schema_validator=PydanticSchemaValidator( - schema_registry=get_schema_registry() - ), - ) - self._controller_factory = ControllerFactory( - config_service=self._config_service, - schema_registry=get_schema_registry(), - ) - - self._kafka_bridge_thread = threading.Thread( - target=self._kafka_bridge.start, daemon=True - ) - self._logger.info("Config service setup complete") - - def _setup_data_infrastructure(self, instrument: str, dev: bool) -> None: - """Set up data services, forwarder, and orchestrator.""" - # da00 of backend services converted to scipp.DataArray - ScippDataService = DataService[ResultKey, sc.DataArray] - self._data_service = ScippDataService() - self._stream_manager = StreamManager( - data_service=self._data_service, pipe_factory=streams.Pipe - ) - self._job_service = JobService( - data_service=self._data_service, logger=self._logger - ) - self._job_controller = JobController( - config_service=self._config_service, job_service=self._job_service - ) - self._plotting_controller = PlottingController( - job_service=self._job_service, - stream_manager=self._stream_manager, - logger=self._logger, - ) - self._orchestrator = Orchestrator( - self._setup_kafka_consumer(instrument=instrument, dev=dev), - data_service=self._data_service, - job_service=self._job_service, - ) - self._logger.info("Data infrastructure setup complete") - - def _setup_kafka_consumer( - self, instrument: str, dev: bool - ) -> AdaptingMessageSource: - """Set up Kafka consumer for data streams.""" - consumer_config = load_config( - namespace=config_names.reduced_data_consumer, env='' - ) - kafka_downstream_config = load_config(namespace=config_names.kafka_downstream) - data_topic = stream_kind_to_topic( - instrument=self._instrument, kind=StreamKind.LIVEDATA_DATA - ) - status_topic = stream_kind_to_topic( - instrument=self._instrument, kind=StreamKind.LIVEDATA_STATUS - ) - consumer = self._exit_stack.enter_context( - kafka_consumer.make_consumer_from_config( - topics=[data_topic, status_topic], - config={**consumer_config, **kafka_downstream_config}, - group='dashboard', - ) - ) - source = self._exit_stack.enter_context( - BackgroundMessageSource(consumer=consumer) - ) - stream_mapping = get_stream_mapping(instrument=instrument, dev=dev) - adapter = ( - RoutingAdapterBuilder(stream_mapping=stream_mapping) - .with_livedata_data_route() - .with_livedata_status_route() - .build() - ) - return AdaptingMessageSource(source=source, adapter=adapter) - - def _setup_workflow_management(self) -> None: - """Initialize workflow controller and reduction widget.""" - self._workflow_controller = WorkflowController.from_config_service( - config_service=self._config_service, - source_names=sorted(self._processor_factory.source_names), - workflow_registry=self._processor_factory, - data_service=self._data_service, - ) - self._reduction_widget = ReductionWidget(controller=self._workflow_controller) def _step(self): """Step function for periodic updates.""" @@ -196,7 +114,7 @@ def _step(self): # succession, which is visually distracting. # Furthermore, this improves performance by reducing the number of re-renders. with pn.io.hold(): - self._orchestrator.update() + self._services.orchestrator.update() def get_dashboard_title(self) -> str: """Get the dashboard title. Override for custom titles.""" @@ -227,7 +145,6 @@ def start_periodic_updates(self, period: int = 500) -> None: def _safe_step(): try: self._step() - self._config_service.process_incoming_messages() except Exception: self._logger.exception("Error in periodic update step.") @@ -237,11 +154,7 @@ def _safe_step(): def create_layout(self) -> pn.template.MaterialTemplate: """Create the basic dashboard layout.""" sidebar_content = self.create_sidebar_content() - main_content = PlotCreationWidget( - job_service=self._job_service, - job_controller=self._job_controller, - plotting_controller=self._plotting_controller, - ).widget + main_content = self.create_main_content() template = pn.template.MaterialTemplate( title=self.get_dashboard_title(), @@ -267,7 +180,7 @@ def server(self): def _start_impl(self) -> None: """Start the dashboard service.""" - self._kafka_bridge_thread.start() + self._services.start() def run_forever(self) -> None: """Run the dashboard server.""" @@ -290,6 +203,5 @@ def run_forever(self) -> None: def _stop_impl(self) -> None: """Clean shutdown of all components.""" - self._kafka_bridge.stop() - self._kafka_bridge_thread.join() + self._services.stop() self._exit_stack.__exit__(None, None, None) diff --git a/src/ess/livedata/dashboard/reduction.py b/src/ess/livedata/dashboard/reduction.py index 8fb10138c..61433681e 100644 --- a/src/ess/livedata/dashboard/reduction.py +++ b/src/ess/livedata/dashboard/reduction.py @@ -8,8 +8,13 @@ from ess.livedata import Service from .dashboard import DashboardBase +from .plot_orchestrator import PlotOrchestrator +from .widgets.log_producer_widget import LogProducerWidget +from .widgets.plot_creation_widget import PlotCreationWidget +from .widgets.plot_grid_tabs import PlotGridTabs +from .widgets.reduction_widget import ReductionWidget -pn.extension('holoviews', 'modal', template='material') +pn.extension('holoviews', 'modal', notifications=True, template='material') hv.extension('bokeh') @@ -22,6 +27,7 @@ def __init__( instrument: str = 'dummy', dev: bool = False, log_level: int, + transport: str = 'kafka', basic_auth_password: str | None = None, basic_auth_cookie_secret: str | None = None, ): @@ -31,27 +37,99 @@ def __init__( log_level=log_level, dashboard_name='reduction_dashboard', port=5009, # Default port for reduction dashboard + transport=transport, basic_auth_password=basic_auth_password, basic_auth_cookie_secret=basic_auth_cookie_secret, ) + + # Create shared orchestrators (must be shared across all sessions) + self._plot_orchestrator = PlotOrchestrator( + plotting_controller=self._services.plotting_controller, + job_orchestrator=self._services.job_orchestrator, + data_service=self._services.data_service, + config_store=self._services.plotter_config_store, + ) + self._logger.info("Reduction dashboard initialized") def create_sidebar_content(self) -> pn.viewable.Viewable: """Create the sidebar content with workflow controls.""" + # Create reduction widget (per-session) + reduction_widget = ReductionWidget( + controller=self._services.workflow_controller + ) + + # Create log producer widget only in dev mode (per-session) + dev_content = [] + if self._dev: + dev_widget = LogProducerWidget( + instrument=self._instrument, + logger=self._logger, + exit_stack=self._exit_stack, + ) + dev_content = [dev_widget.panel, pn.layout.Divider()] + return pn.Column( + *dev_content, pn.pane.Markdown("## Data Reduction"), - self._reduction_widget.widget, + reduction_widget.widget, ) def create_main_content(self) -> pn.viewable.Viewable: - """Create the main content area.""" - return pn.Row() + """Create the main content area with tabs for old and new interfaces.""" + # Create the original plot creation widget (per-session) + plot_creation_widget = PlotCreationWidget( + job_service=self._services.job_service, + job_controller=self._services.job_controller, + plotting_controller=self._services.plotting_controller, + workflow_controller=self._services.workflow_controller, + ).widget + + # Create UI widget connected to shared orchestrator + plot_grid_tabs = PlotGridTabs( + plot_orchestrator=self._plot_orchestrator, + # Temporary hack, will likely get this from JobOrchestrator, or make + # registry more accessible. + workflow_registry=self._services.workflow_controller._workflow_registry, + plotting_controller=self._services.plotting_controller, + ) + + # Create tabs with both old and new interfaces + main_tabs = pn.Tabs( + ('Legacy interface', plot_creation_widget), + ( + 'Future interface (incomplete and not functional)', + plot_grid_tabs.panel, + ), + sizing_mode='stretch_both', + ) + + return main_tabs def get_arg_parser() -> argparse.ArgumentParser: - return Service.setup_arg_parser( - description='ESSlivedata Dashboard', dashboard_auth=True + parser = Service.setup_arg_parser(description='ESSlivedata Dashboard') + parser.add_argument( + '--transport', + choices=['kafka', 'none'], + default='kafka', + help='Transport backend for message handling', + ) + parser.add_argument( + '--basic-auth-password', + default=None, + help='Basic authentication password for the dashboard. ' + 'For better security, use the environment variable ' + 'LIVEDATA_BASIC_AUTH_PASSWORD instead.', + ) + parser.add_argument( + '--basic-auth-cookie-secret', + default=None, + help='Basic authentication cookie secret for the dashboard. ' + 'For better security, use the environment variable ' + 'LIVEDATA_BASIC_AUTH_COOKIE_SECRET instead.', ) + return parser def main() -> None: