Skip to content

Commit 9484763

Browse files
committed
Optimize start_workflow to reduce multiple UI rebuilds
Adds JobOrchestrator.replace_staged_configs() to replace all staged configs in a single operation with a single notification to subscribers. This significantly reduces UI rebuilds during workflow start. Before this change, start_workflow triggered N+2 widget rebuilds: - clear_staged_configs() → rebuild 1 - stage_config() for source 1 → rebuild 2 - stage_config() for source 2 → rebuild 3 - ... - stage_config() for source N → rebuild N+1 - commit_workflow() → rebuild N+2 After this change, it triggers only 2 rebuilds: - replace_staged_configs() → rebuild 1 (all sources) - commit_workflow() → rebuild 2 Changes: - Add replace_staged_configs() method to JobOrchestrator - Update WorkflowController.start_workflow() to use replace_staged_configs() - Add 3 tests verifying single notification, config replacement, and copying This improves performance and reduces UI flicker when starting workflows with multiple sources. Related to workflow-status-widget-architecture-fix.md review feedback.
1 parent 1662a2d commit 9484763

File tree

3 files changed

+164
-12
lines changed

3 files changed

+164
-12
lines changed

src/ess/livedata/dashboard/job_orchestrator.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,38 @@ def stage_config(
263263
)
264264
self._notify_staged_changed(workflow_id)
265265

266+
def replace_staged_configs(
267+
self,
268+
workflow_id: WorkflowId,
269+
*,
270+
configs: dict[SourceName, JobConfig],
271+
) -> None:
272+
"""
273+
Replace all staged configs with new configs in a single operation.
274+
275+
This is more efficient than clear + multiple stage_config calls because
276+
it triggers only a single notification to subscribers, reducing UI rebuilds.
277+
278+
Parameters
279+
----------
280+
workflow_id
281+
The workflow to configure.
282+
configs
283+
Dict mapping source names to their configs.
284+
"""
285+
# Clear existing staged configs
286+
self._workflows[workflow_id].staged_jobs.clear()
287+
288+
# Add all new configs (with copies to prevent external mutation)
289+
for source_name, job_config in configs.items():
290+
self._workflows[workflow_id].staged_jobs[source_name] = JobConfig(
291+
params=job_config.params.copy(),
292+
aux_source_names=job_config.aux_source_names.copy(),
293+
)
294+
295+
# Single notification for the entire operation
296+
self._notify_staged_changed(workflow_id)
297+
266298
def commit_workflow(self, workflow_id: WorkflowId) -> list[JobId]:
267299
"""
268300
Commit staged configs and start workflow.

src/ess/livedata/dashboard/workflow_controller.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from .configuration_adapter import ConfigurationState
2222
from .correlation_histogram import CorrelationHistogramController, make_workflow_spec
2323
from .data_service import DataService
24-
from .job_orchestrator import JobOrchestrator
24+
from .job_orchestrator import JobConfig, JobOrchestrator
2525
from .workflow_configuration_adapter import WorkflowConfigurationAdapter
2626

2727

@@ -129,21 +129,19 @@ def start_workflow(
129129
if not source_names:
130130
return []
131131

132-
# Clear existing staged configs and stage new ones
133-
# This ensures only the requested sources are included in the workflow
134-
self._orchestrator.clear_staged_configs(workflow_id)
135-
136132
# Convert Pydantic models to dicts for orchestrator
137133
params_dict = config.model_dump(mode='json')
138134
aux_dict = aux_source_names.model_dump(mode='json') if aux_source_names else {}
139135

140-
for source_name in source_names:
141-
self._orchestrator.stage_config(
142-
workflow_id,
143-
source_name=source_name,
144-
params=params_dict,
145-
aux_source_names=aux_dict,
146-
)
136+
# Build configs for all sources
137+
configs = {
138+
source_name: JobConfig(params=params_dict, aux_source_names=aux_dict)
139+
for source_name in source_names
140+
}
141+
142+
# Replace staged configs in a single operation (more efficient than
143+
# clear + multiple stage_config calls, reduces UI rebuilds)
144+
self._orchestrator.replace_staged_configs(workflow_id, configs=configs)
147145

148146
# Commit and start workflow
149147
return self._orchestrator.commit_workflow(workflow_id)

tests/dashboard/job_orchestrator_test.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1617,3 +1617,125 @@ def test_none_callbacks_are_skipped(
16171617

16181618
# Only the commit callback was set
16191619
assert events == ['committed']
1620+
1621+
def test_replace_staged_configs_replaces_all_configs(
1622+
self,
1623+
workflow_with_params: WorkflowSpec,
1624+
fake_workflow_config_service: FakeWorkflowConfigService,
1625+
):
1626+
"""replace_staged_configs replaces all staged configs in one operation."""
1627+
from ess.livedata.dashboard.job_orchestrator import JobConfig
1628+
1629+
workflow_id = workflow_with_params.get_id()
1630+
registry = {workflow_id: workflow_with_params}
1631+
1632+
orchestrator = JobOrchestrator(
1633+
command_service=CommandService(sink=FakeMessageSink()),
1634+
workflow_config_service=fake_workflow_config_service,
1635+
workflow_registry=registry,
1636+
config_store=None,
1637+
)
1638+
1639+
# Stage some initial configs
1640+
orchestrator.stage_config(
1641+
workflow_id,
1642+
source_name="det_1",
1643+
params={"threshold": 100.0},
1644+
aux_source_names={},
1645+
)
1646+
1647+
# Replace with new configs
1648+
new_configs = {
1649+
"det_2": JobConfig(params={"threshold": 200.0}, aux_source_names={}),
1650+
"det_3": JobConfig(params={"threshold": 300.0}, aux_source_names={}),
1651+
}
1652+
orchestrator.replace_staged_configs(workflow_id, configs=new_configs)
1653+
1654+
# Old config should be gone, new configs present
1655+
staged = orchestrator.get_staged_config(workflow_id)
1656+
assert "det_1" not in staged
1657+
assert "det_2" in staged
1658+
assert "det_3" in staged
1659+
assert staged["det_2"].params == {"threshold": 200.0}
1660+
assert staged["det_3"].params == {"threshold": 300.0}
1661+
1662+
def test_replace_staged_configs_triggers_single_notification(
1663+
self,
1664+
workflow_with_params: WorkflowSpec,
1665+
fake_workflow_config_service: FakeWorkflowConfigService,
1666+
):
1667+
"""replace_staged_configs triggers only one notification for efficiency."""
1668+
from ess.livedata.dashboard.job_orchestrator import (
1669+
JobConfig,
1670+
WidgetLifecycleCallbacks,
1671+
)
1672+
1673+
workflow_id = workflow_with_params.get_id()
1674+
registry = {workflow_id: workflow_with_params}
1675+
1676+
orchestrator = JobOrchestrator(
1677+
command_service=CommandService(sink=FakeMessageSink()),
1678+
workflow_config_service=fake_workflow_config_service,
1679+
workflow_registry=registry,
1680+
config_store=None,
1681+
)
1682+
1683+
# Subscribe and count notifications
1684+
notification_count = 0
1685+
1686+
def count_notification(wid):
1687+
nonlocal notification_count
1688+
notification_count += 1
1689+
1690+
callbacks = WidgetLifecycleCallbacks(on_staged_changed=count_notification)
1691+
orchestrator.subscribe_to_widget_lifecycle(callbacks)
1692+
1693+
# Clear the counter (initial subscription triggers notification)
1694+
notification_count = 0
1695+
1696+
# Replace with multiple configs - should trigger only ONE notification
1697+
new_configs = {
1698+
"det_1": JobConfig(params={"threshold": 100.0}, aux_source_names={}),
1699+
"det_2": JobConfig(params={"threshold": 200.0}, aux_source_names={}),
1700+
"det_3": JobConfig(params={"threshold": 300.0}, aux_source_names={}),
1701+
}
1702+
orchestrator.replace_staged_configs(workflow_id, configs=new_configs)
1703+
1704+
# Only one notification despite replacing 3 configs
1705+
assert notification_count == 1
1706+
1707+
def test_replace_staged_configs_copies_config_dicts(
1708+
self,
1709+
workflow_with_params: WorkflowSpec,
1710+
fake_workflow_config_service: FakeWorkflowConfigService,
1711+
):
1712+
"""replace_staged_configs copies config dicts to prevent external mutation."""
1713+
from ess.livedata.dashboard.job_orchestrator import JobConfig
1714+
1715+
workflow_id = workflow_with_params.get_id()
1716+
registry = {workflow_id: workflow_with_params}
1717+
1718+
orchestrator = JobOrchestrator(
1719+
command_service=CommandService(sink=FakeMessageSink()),
1720+
workflow_config_service=fake_workflow_config_service,
1721+
workflow_registry=registry,
1722+
config_store=None,
1723+
)
1724+
1725+
# Create configs with mutable dicts
1726+
params = {"threshold": 100.0}
1727+
aux = {"monitor": "mon_1"}
1728+
configs = {
1729+
"det_1": JobConfig(params=params, aux_source_names=aux),
1730+
}
1731+
1732+
orchestrator.replace_staged_configs(workflow_id, configs=configs)
1733+
1734+
# Mutate the original dicts
1735+
params["threshold"] = 999.0
1736+
aux["monitor"] = "mutated"
1737+
1738+
# Staged config should be unaffected (was copied)
1739+
staged = orchestrator.get_staged_config(workflow_id)
1740+
assert staged["det_1"].params["threshold"] == 100.0
1741+
assert staged["det_1"].aux_source_names["monitor"] == "mon_1"

0 commit comments

Comments
 (0)