@@ -306,7 +306,7 @@ def state_complete(workflow, state):
306
306
)
307
307
308
308
309
- def workflow_state_change_cb (event , handle , k8s_api ):
309
+ def workflow_state_change_cb (event , handle , k8s_api , disable_fluxion ):
310
310
"""Exception-catching wrapper around _workflow_state_change_cb_inner."""
311
311
try :
312
312
workflow = event ["object" ]
@@ -324,7 +324,9 @@ def workflow_state_change_cb(event, handle, k8s_api):
324
324
del _WORKFLOWINFO_CACHE [jobid ]
325
325
return
326
326
try :
327
- _workflow_state_change_cb_inner (workflow , jobid , winfo , handle , k8s_api )
327
+ _workflow_state_change_cb_inner (
328
+ workflow , jobid , winfo , handle , k8s_api , disable_fluxion
329
+ )
328
330
except Exception :
329
331
LOGGER .exception (
330
332
"Failed to process event update for workflow with jobid %s:" , jobid
@@ -342,7 +344,9 @@ def workflow_state_change_cb(event, handle, k8s_api):
342
344
handle .job_raise (jobid , "exception" , 0 , "DWS/Rabbit interactions failed" )
343
345
344
346
345
- def _workflow_state_change_cb_inner (workflow , jobid , winfo , handle , k8s_api ):
347
+ def _workflow_state_change_cb_inner (
348
+ workflow , jobid , winfo , handle , k8s_api , disable_fluxion
349
+ ):
346
350
if "state" not in workflow ["status" ]:
347
351
# workflow was just submitted, DWS still needs to give workflow
348
352
# a state of 'Proposal'
@@ -379,13 +383,15 @@ def _workflow_state_change_cb_inner(workflow, jobid, winfo, handle, k8s_api):
379
383
resources = flux .job .kvslookup .job_kvs_lookup (handle , jobid )["jobspec" ][
380
384
"resources"
381
385
]
386
+ if not disable_fluxion :
387
+ resources = directivebreakdown .apply_breakdowns (
388
+ k8s_api , workflow , resources , _MIN_ALLOCATION_SIZE
389
+ )
382
390
handle .rpc (
383
391
"job-manager.dws.resource-update" ,
384
392
payload = {
385
393
"id" : jobid ,
386
- "resources" : directivebreakdown .apply_breakdowns (
387
- k8s_api , workflow , resources , _MIN_ALLOCATION_SIZE
388
- ),
394
+ "resources" : resources ,
389
395
},
390
396
).then (log_rpc_response )
391
397
elif state_complete (workflow , "Setup" ):
@@ -537,7 +543,10 @@ def init_rabbits(k8s_api, handle, watchers, graph_path, disable_draining):
537
543
else :
538
544
mark_rabbit (handle , rabbit ["status" ]["status" ], * rabbit_rpaths [name ], name )
539
545
drain_offline_nodes (
540
- handle , name , rabbit ["status" ]["access" ].get ("computes" , []), disable_draining
546
+ handle ,
547
+ name ,
548
+ rabbit ["status" ]["access" ].get ("computes" , []),
549
+ disable_draining ,
541
550
)
542
551
watchers .add_watch (
543
552
Watch (
@@ -625,6 +634,11 @@ def setup_parsing():
625
634
action = "store_true" ,
626
635
help = "Disable the draining of compute nodes based on k8s status" ,
627
636
)
637
+ parser .add_argument (
638
+ "--disable-fluxion" ,
639
+ action = "store_true" ,
640
+ help = "Disable Fluxion scheduling of rabbits" ,
641
+ )
628
642
return parser
629
643
630
644
@@ -729,16 +743,25 @@ def main():
729
743
# start watching k8s workflow resources and operate on them when updates occur
730
744
# or new RPCs are received
731
745
with Watchers (handle , watch_interval = args .watch_interval ) as watchers :
732
- init_rabbits (
733
- k8s_api ,
734
- handle ,
735
- watchers ,
736
- args .resourcegraph ,
737
- args .disable_compute_node_draining ,
738
- )
746
+ if not args .disable_fluxion :
747
+ init_rabbits (
748
+ k8s_api ,
749
+ handle ,
750
+ watchers ,
751
+ args .resourcegraph ,
752
+ args .disable_compute_node_draining ,
753
+ )
739
754
services = register_services (handle , k8s_api )
740
755
watchers .add_watch (
741
- Watch (k8s_api , WORKFLOW_CRD , 0 , workflow_state_change_cb , handle , k8s_api )
756
+ Watch (
757
+ k8s_api ,
758
+ WORKFLOW_CRD ,
759
+ 0 ,
760
+ workflow_state_change_cb ,
761
+ handle ,
762
+ k8s_api ,
763
+ args .disable_fluxion ,
764
+ )
742
765
)
743
766
raise_self_exception (handle )
744
767
0 commit comments