Skip to content

Commit cb5c6fc

Browse files
SimonHeybrockclaude
andcommitted
Replace bespoke replace_staged_configs with general transaction mechanism
Adds staging_transaction() context manager to JobOrchestrator to batch multiple staging operations with a single notification. This replaces the bespoke replace_staged_configs() method with a more general, future-proof pattern. Benefits: - Handles any combination of staging operations, not just specific use cases - New operations automatically benefit from batched notifications - Follows standard Python context manager pattern - Cleaner API with fewer methods to maintain Changes: - Add staging_transaction() context manager to JobOrchestrator - Support nested transactions for the same workflow - Reject attempts to nest transactions for different workflows - Update _notify_staged_changed() to defer notifications during transactions - Update WorkflowController.start_workflow() to use transaction - Update WorkflowStatusWidget._on_remove_click() to use transaction - Remove deprecated replace_staged_configs() method - Replace 3 replace_staged_configs tests with 4 comprehensive transaction tests This fixes the N+1 notification problem in both start_workflow (N+2 → 2 notifications) and _on_remove_click (N+1 → 1 notification), and prevents similar issues in future code. Original prompt: Have a look at the latest commit and related code. Think about whether JobOrchestrator should have a transaction mechanism instead of some bespoke methods. Follow-up: Yes, please make a todo list and implement! 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 9484763 commit cb5c6fc

File tree

4 files changed

+243
-95
lines changed

4 files changed

+243
-95
lines changed

src/ess/livedata/dashboard/job_orchestrator.py

Lines changed: 104 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import logging
1515
import uuid
1616
from collections.abc import Callable, Mapping
17+
from contextlib import contextmanager
1718
from dataclasses import dataclass, field
1819
from typing import NewType
1920
from uuid import UUID
@@ -138,6 +139,10 @@ def __init__(
138139
WidgetLifecycleSubscriptionId, WidgetLifecycleCallbacks
139140
] = {}
140141

142+
# Transaction state for batching staging operations
143+
self._transaction_workflow: WorkflowId | None = None
144+
self._transaction_depth: int = 0
145+
141146
# Load persisted configs
142147
self._load_configs_from_store()
143148

@@ -224,6 +229,90 @@ def _load_configs_from_store(self) -> None:
224229
# Future subscribers are notified via subscribe_to_workflow(), which
225230
# checks for existing active jobs and notifies immediately.
226231

232+
@contextmanager
233+
def staging_transaction(self, workflow_id: WorkflowId):
234+
"""
235+
Context manager for batching staging operations.
236+
237+
All staging operations (clear_staged_configs, stage_config) within the
238+
context will trigger only a single notification when the context exits.
239+
This prevents N+1 UI rebuild issues when performing multiple staging
240+
operations together.
241+
242+
Transactions can be nested - only the outermost transaction triggers
243+
the notification on exit.
244+
245+
Parameters
246+
----------
247+
workflow_id
248+
The workflow to perform staging operations on.
249+
250+
Yields
251+
------
252+
None
253+
254+
Raises
255+
------
256+
ValueError
257+
If attempting to nest transactions for different workflows.
258+
259+
Examples
260+
--------
261+
Replace all staged configs in one operation:
262+
263+
with orchestrator.staging_transaction(workflow_id):
264+
orchestrator.clear_staged_configs(workflow_id)
265+
for source in sources:
266+
orchestrator.stage_config(workflow_id, source, ...)
267+
# Single notification sent here
268+
269+
Remove specific sources:
270+
271+
with orchestrator.staging_transaction(workflow_id):
272+
staged = orchestrator.get_staged_config(workflow_id)
273+
orchestrator.clear_staged_configs(workflow_id)
274+
for source, config in staged.items():
275+
if source not in to_remove:
276+
orchestrator.stage_config(workflow_id, source, ...)
277+
# Single notification sent here
278+
"""
279+
# Enter transaction
280+
if self._transaction_workflow is None:
281+
self._transaction_workflow = workflow_id
282+
elif self._transaction_workflow != workflow_id:
283+
msg = (
284+
f'Cannot nest transactions for different workflows: '
285+
f'current={self._transaction_workflow}, new={workflow_id}'
286+
)
287+
raise ValueError(msg)
288+
289+
self._transaction_depth += 1
290+
self._logger.debug(
291+
'Entering staging transaction for workflow %s (depth=%d)',
292+
workflow_id,
293+
self._transaction_depth,
294+
)
295+
296+
try:
297+
yield
298+
finally:
299+
# Exit transaction
300+
self._transaction_depth -= 1
301+
self._logger.debug(
302+
'Exiting staging transaction for workflow %s (depth=%d)',
303+
workflow_id,
304+
self._transaction_depth,
305+
)
306+
307+
if self._transaction_depth == 0:
308+
# Outermost transaction exiting - send notification
309+
self._logger.debug(
310+
'Transaction complete for workflow %s, notifying subscribers',
311+
workflow_id,
312+
)
313+
self._transaction_workflow = None
314+
self._notify_staged_changed(workflow_id)
315+
227316
def clear_staged_configs(self, workflow_id: WorkflowId) -> None:
228317
"""
229318
Clear all staged configs for a workflow.
@@ -263,38 +352,6 @@ def stage_config(
263352
)
264353
self._notify_staged_changed(workflow_id)
265354

266-
def replace_staged_configs(
267-
self,
268-
workflow_id: WorkflowId,
269-
*,
270-
configs: dict[SourceName, JobConfig],
271-
) -> None:
272-
"""
273-
Replace all staged configs with new configs in a single operation.
274-
275-
This is more efficient than clear + multiple stage_config calls because
276-
it triggers only a single notification to subscribers, reducing UI rebuilds.
277-
278-
Parameters
279-
----------
280-
workflow_id
281-
The workflow to configure.
282-
configs
283-
Dict mapping source names to their configs.
284-
"""
285-
# Clear existing staged configs
286-
self._workflows[workflow_id].staged_jobs.clear()
287-
288-
# Add all new configs (with copies to prevent external mutation)
289-
for source_name, job_config in configs.items():
290-
self._workflows[workflow_id].staged_jobs[source_name] = JobConfig(
291-
params=job_config.params.copy(),
292-
aux_source_names=job_config.aux_source_names.copy(),
293-
)
294-
295-
# Single notification for the entire operation
296-
self._notify_staged_changed(workflow_id)
297-
298355
def commit_workflow(self, workflow_id: WorkflowId) -> list[JobId]:
299356
"""
300357
Commit staged configs and start workflow.
@@ -735,7 +792,21 @@ def unsubscribe_from_widget_lifecycle(
735792
)
736793

737794
def _notify_staged_changed(self, workflow_id: WorkflowId) -> None:
738-
"""Notify all widget subscribers that staging area changed."""
795+
"""Notify all widget subscribers that staging area changed.
796+
797+
Notifications are deferred if currently in a staging transaction.
798+
The transaction context manager will call this on exit.
799+
"""
800+
# Skip notification if we're in a transaction - it will be sent on exit
801+
if self._transaction_workflow is not None:
802+
self._logger.debug(
803+
'Deferring staged_changed notification for workflow %s '
804+
'(in transaction, depth=%d)',
805+
workflow_id,
806+
self._transaction_depth,
807+
)
808+
return
809+
739810
for subscription_id, callbacks in self._widget_subscriptions.items():
740811
if callbacks.on_staged_changed is not None:
741812
try:

src/ess/livedata/dashboard/widgets/workflow_status_widget.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -799,18 +799,19 @@ def _on_remove_click(self, source_names: list[str]) -> None:
799799
"""Handle remove button click - remove sources from staged config."""
800800
staged = self._orchestrator.get_staged_config(self._workflow_id)
801801

802-
# Remove the specified sources and re-stage the rest.
803-
# Note: clear_staged_configs() and stage_config() both notify subscribers,
804-
# so the widget will rebuild via _on_lifecycle_event callback.
805-
self._orchestrator.clear_staged_configs(self._workflow_id)
806-
for source_name, config in staged.items():
807-
if source_name not in source_names:
808-
self._orchestrator.stage_config(
809-
self._workflow_id,
810-
source_name=source_name,
811-
params=config.params,
812-
aux_source_names=config.aux_source_names,
813-
)
802+
# Remove the specified sources and re-stage the rest in a transaction.
803+
# Transaction ensures only a single notification is sent, so the widget
804+
# rebuilds once via _on_lifecycle_event callback.
805+
with self._orchestrator.staging_transaction(self._workflow_id):
806+
self._orchestrator.clear_staged_configs(self._workflow_id)
807+
for source_name, config in staged.items():
808+
if source_name not in source_names:
809+
self._orchestrator.stage_config(
810+
self._workflow_id,
811+
source_name=source_name,
812+
params=config.params,
813+
aux_source_names=config.aux_source_names,
814+
)
814815

815816
def _on_reset_click(self) -> None:
816817
"""Handle reset button click."""

src/ess/livedata/dashboard/workflow_controller.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from .configuration_adapter import ConfigurationState
2222
from .correlation_histogram import CorrelationHistogramController, make_workflow_spec
2323
from .data_service import DataService
24-
from .job_orchestrator import JobConfig, JobOrchestrator
24+
from .job_orchestrator import JobOrchestrator
2525
from .workflow_configuration_adapter import WorkflowConfigurationAdapter
2626

2727

@@ -133,15 +133,16 @@ def start_workflow(
133133
params_dict = config.model_dump(mode='json')
134134
aux_dict = aux_source_names.model_dump(mode='json') if aux_source_names else {}
135135

136-
# Build configs for all sources
137-
configs = {
138-
source_name: JobConfig(params=params_dict, aux_source_names=aux_dict)
139-
for source_name in source_names
140-
}
141-
142-
# Replace staged configs in a single operation (more efficient than
143-
# clear + multiple stage_config calls, reduces UI rebuilds)
144-
self._orchestrator.replace_staged_configs(workflow_id, configs=configs)
136+
# Replace all staged configs in a transaction (single notification)
137+
with self._orchestrator.staging_transaction(workflow_id):
138+
self._orchestrator.clear_staged_configs(workflow_id)
139+
for source_name in source_names:
140+
self._orchestrator.stage_config(
141+
workflow_id,
142+
source_name=source_name,
143+
params=params_dict,
144+
aux_source_names=aux_dict,
145+
)
145146

146147
# Commit and start workflow
147148
return self._orchestrator.commit_workflow(workflow_id)

0 commit comments

Comments
 (0)