Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions src/ess/livedata/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def _stop_impl(self) -> None:

@staticmethod
def setup_arg_parser(
description: str, *, dev_flag: bool = True, dashboard_auth: bool = False
description: str, *, dev_flag: bool = True
) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=description,
Expand All @@ -201,17 +201,6 @@ def setup_arg_parser(
default='INFO',
help='Set the logging level',
)
if dashboard_auth:
parser.add_argument(
'--basic-auth-password',
default=None,
help='Basic authentication password for the dashboard',
)
parser.add_argument(
'--basic-auth-cookie-secret',
default=None,
help='Basic authentication cookie secret for the dashboard',
)
return parser


Expand All @@ -229,6 +218,7 @@ def get_env_defaults(
# Convert --arg-name to LIVEDATA_ARG_NAME
env_name = f"{prefix}_{action.dest.upper().replace('-', '_')}"
env_val = os.getenv(env_name)

# Override with environment variable if present
if env_val is not None:
if isinstance(default_value, bool):
Expand Down
200 changes: 56 additions & 144 deletions src/ess/livedata/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,18 @@
"""Common functionality for implementing dashboards."""

import logging
import threading
from abc import ABC, abstractmethod
from contextlib import ExitStack

import panel as pn
import scipp as sc
from holoviews import Dimension, streams

from ess.livedata import ServiceBase
from ess.livedata.config import config_names, instrument_registry
from ess.livedata.config.config_loader import load_config
from ess.livedata.config.instruments import get_config
from ess.livedata.config.schema_registry import get_schema_registry
from ess.livedata.config.streams import get_stream_mapping, stream_kind_to_topic
from ess.livedata.config.workflow_spec import ResultKey
from ess.livedata.core.message import StreamKind
from ess.livedata.kafka import consumer as kafka_consumer
from ess.livedata.kafka.message_adapter import AdaptingMessageSource
from ess.livedata.kafka.routes import RoutingAdapterBuilder
from ess.livedata.kafka.source import BackgroundMessageSource

from .config_service import ConfigService
from .controller_factory import ControllerFactory
from .data_service import DataService
from .job_controller import JobController
from .job_service import JobService
from .kafka_transport import KafkaTransport
from .message_bridge import BackgroundMessageBridge
from .orchestrator import Orchestrator
from .plotting_controller import PlottingController
from .schema_validator import PydanticSchemaValidator
from .stream_manager import StreamManager
from .widgets.plot_creation_widget import PlotCreationWidget
from .widgets.reduction_widget import ReductionWidget
from .workflow_controller import WorkflowController

from .config_store import ConfigStoreManager
from .dashboard_services import DashboardServices
from .kafka_transport import DashboardKafkaTransport
from .transport import NullTransport, Transport

# Global throttling for sliders, etc.
pn.config.throttled = True
Expand All @@ -54,6 +31,7 @@ def __init__(
log_level: int = logging.INFO,
dashboard_name: str,
port: int = 5007,
transport: str = 'kafka',
basic_auth_password: str | None = None,
basic_auth_cookie_secret: str | None = None,
):
Expand All @@ -62,23 +40,62 @@ def __init__(
self._instrument = instrument
self._port = port
self._dev = dev

# Validate basic auth configuration
if (basic_auth_password is None) != (basic_auth_cookie_secret is None):
raise ValueError(
"Both 'basic_auth_password' and 'basic_auth_cookie_secret' must be "
"provided together, or neither should be provided."
)
self._basic_auth_password = basic_auth_password
self._basic_auth_cookie_secret = basic_auth_cookie_secret

self._exit_stack = ExitStack()
self._exit_stack.__enter__()

self._callback = None
self._setup_config_service()
self._setup_data_infrastructure(instrument=instrument, dev=dev)

# Config store manager for file-backed persistent UI state (GUI dashboards)
config_manager = ConfigStoreManager(instrument=instrument, store_type='file')

# Setup all dashboard services
self._services = DashboardServices(
instrument=instrument,
dev=dev,
exit_stack=self._exit_stack,
logger=self._logger,
pipe_factory=streams.Pipe,
transport=self._create_transport(transport),
config_manager=config_manager,
)

self._logger.info("%s initialized", self.__class__.__name__)

# Global unit format.
# Global unit format
Dimension.unit_format = ' [{unit}]'

# Load the module to register the instrument's workflows.
self._instrument_module = get_config(instrument)
self._processor_factory = instrument_registry[self._instrument].workflow_factory
self._setup_workflow_management()
def _create_transport(self, transport: str) -> Transport:
"""
Create transport instance based on transport type.

Parameters
----------
transport:
Transport type ('kafka' or 'none')

Returns
-------
:
Transport instance
"""
if transport == 'kafka':
return DashboardKafkaTransport(
instrument=self._instrument, dev=self._dev, logger=self._logger
)
elif transport == 'none':
return NullTransport()
else:
raise ValueError(f"Unknown transport type: {transport}")

@abstractmethod
def create_sidebar_content(self) -> pn.viewable.Viewable:
Expand All @@ -88,105 +105,6 @@ def create_sidebar_content(self) -> pn.viewable.Viewable:
@abstractmethod
def create_main_content(self) -> pn.viewable.Viewable:
"""Override this method to create the main dashboard content."""
# Currently unused, should this allow for defining a custom layout where plots
# should be placed?

def _setup_config_service(self) -> None:
"""Set up configuration service with Kafka bridge."""
kafka_downstream_config = load_config(namespace=config_names.kafka_downstream)
_, consumer = self._exit_stack.enter_context(
kafka_consumer.make_control_consumer(instrument=self._instrument)
)
kafka_transport = KafkaTransport(
kafka_config=kafka_downstream_config, consumer=consumer, logger=self._logger
)
self._kafka_bridge = BackgroundMessageBridge(
transport=kafka_transport, logger=self._logger
)
self._config_service = ConfigService(
message_bridge=self._kafka_bridge,
schema_validator=PydanticSchemaValidator(
schema_registry=get_schema_registry()
),
)
self._controller_factory = ControllerFactory(
config_service=self._config_service,
schema_registry=get_schema_registry(),
)

self._kafka_bridge_thread = threading.Thread(
target=self._kafka_bridge.start, daemon=True
)
self._logger.info("Config service setup complete")

def _setup_data_infrastructure(self, instrument: str, dev: bool) -> None:
"""Set up data services, forwarder, and orchestrator."""
# da00 of backend services converted to scipp.DataArray
ScippDataService = DataService[ResultKey, sc.DataArray]
self._data_service = ScippDataService()
self._stream_manager = StreamManager(
data_service=self._data_service, pipe_factory=streams.Pipe
)
self._job_service = JobService(
data_service=self._data_service, logger=self._logger
)
self._job_controller = JobController(
config_service=self._config_service, job_service=self._job_service
)
self._plotting_controller = PlottingController(
job_service=self._job_service,
stream_manager=self._stream_manager,
logger=self._logger,
)
self._orchestrator = Orchestrator(
self._setup_kafka_consumer(instrument=instrument, dev=dev),
data_service=self._data_service,
job_service=self._job_service,
)
self._logger.info("Data infrastructure setup complete")

def _setup_kafka_consumer(
self, instrument: str, dev: bool
) -> AdaptingMessageSource:
"""Set up Kafka consumer for data streams."""
consumer_config = load_config(
namespace=config_names.reduced_data_consumer, env=''
)
kafka_downstream_config = load_config(namespace=config_names.kafka_downstream)
data_topic = stream_kind_to_topic(
instrument=self._instrument, kind=StreamKind.LIVEDATA_DATA
)
status_topic = stream_kind_to_topic(
instrument=self._instrument, kind=StreamKind.LIVEDATA_STATUS
)
consumer = self._exit_stack.enter_context(
kafka_consumer.make_consumer_from_config(
topics=[data_topic, status_topic],
config={**consumer_config, **kafka_downstream_config},
group='dashboard',
)
)
source = self._exit_stack.enter_context(
BackgroundMessageSource(consumer=consumer)
)
stream_mapping = get_stream_mapping(instrument=instrument, dev=dev)
adapter = (
RoutingAdapterBuilder(stream_mapping=stream_mapping)
.with_livedata_data_route()
.with_livedata_status_route()
.build()
)
return AdaptingMessageSource(source=source, adapter=adapter)

def _setup_workflow_management(self) -> None:
"""Initialize workflow controller and reduction widget."""
self._workflow_controller = WorkflowController.from_config_service(
config_service=self._config_service,
source_names=sorted(self._processor_factory.source_names),
workflow_registry=self._processor_factory,
data_service=self._data_service,
)
self._reduction_widget = ReductionWidget(controller=self._workflow_controller)

def _step(self):
"""Step function for periodic updates."""
Expand All @@ -196,7 +114,7 @@ def _step(self):
# succession, which is visually distracting.
# Furthermore, this improves performance by reducing the number of re-renders.
with pn.io.hold():
self._orchestrator.update()
self._services.orchestrator.update()

def get_dashboard_title(self) -> str:
"""Get the dashboard title. Override for custom titles."""
Expand Down Expand Up @@ -227,7 +145,6 @@ def start_periodic_updates(self, period: int = 500) -> None:
def _safe_step():
try:
self._step()
self._config_service.process_incoming_messages()
except Exception:
self._logger.exception("Error in periodic update step.")

Expand All @@ -237,11 +154,7 @@ def _safe_step():
def create_layout(self) -> pn.template.MaterialTemplate:
"""Create the basic dashboard layout."""
sidebar_content = self.create_sidebar_content()
main_content = PlotCreationWidget(
job_service=self._job_service,
job_controller=self._job_controller,
plotting_controller=self._plotting_controller,
).widget
main_content = self.create_main_content()

template = pn.template.MaterialTemplate(
title=self.get_dashboard_title(),
Expand All @@ -267,7 +180,7 @@ def server(self):

def _start_impl(self) -> None:
"""Start the dashboard service."""
self._kafka_bridge_thread.start()
self._services.start()

def run_forever(self) -> None:
"""Run the dashboard server."""
Expand All @@ -290,6 +203,5 @@ def run_forever(self) -> None:

def _stop_impl(self) -> None:
"""Clean shutdown of all components."""
self._kafka_bridge.stop()
self._kafka_bridge_thread.join()
self._services.stop()
self._exit_stack.__exit__(None, None, None)
Loading