From 6ba1e10ffbe37784308a33abd3e4e4bb9a43dd2d Mon Sep 17 00:00:00 2001 From: bethac07 Date: Wed, 11 Jun 2025 10:54:09 -0400 Subject: [PATCH] Refactor run_batch_general to precreate wells list, allow excludes --- run_batch_general.py | 374 +++++++++++++++---------------------------- 1 file changed, 128 insertions(+), 246 deletions(-) diff --git a/run_batch_general.py b/run_batch_general.py index c089e6f..a6052a1 100644 --- a/run_batch_general.py +++ b/run_batch_general.py @@ -38,6 +38,7 @@ def run_batch_general( wells="", # (explicitly list wells. Overwrites rows and columns if passed. Not used by illum. e.g. ['B3','C7']) sites=range(1, 10), # (Not used by illum, qc, or assaydev.) well_digit_pad=True, # Set True to A01 well format name, set False to A1 + excluded_wells="", #any wells you want to remove pipeline="", # (overwrite default pipeline names) pipelinepath="", # (overwrite default path to pipelines) inputpath="", # (overwrite default path to input files) @@ -126,10 +127,19 @@ def run_batch_general( columns = range(1, 13) else: print(f"Unsupported plate format of {plate_format}.") - if well_digit_pad: - well_format = "02d" - else: - well_format = "01d" + + if step != "illum": + if well_digit_pad: + well_format = "02d" + else: + well_format = "01d" + if all(len(ele) == 0 for ele in wells): #if the wells list wasn't set + wells = [f"{eachrow}{int(eachcol):{well_format}}" for eachrow in rows for eachcol in columns] + if all(len(ele) != 0 for ele in excluded_wells): #if there are excluded wells + for excluded_well in excluded_wells: + if excluded_well in wells: + wells.remove(excluded_well) + if step == "zproj": zprojqueue = JobQueue(f"{identifier}_Zproj") @@ -144,64 +154,34 @@ def run_batch_general( csvname = "load_data_unprojected.csv" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - templateMessage_zproj = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join( - datafilepath, plate, csvname - ), - } - zprojqueue.scheduleBatch(templateMessage_zproj) - else: - for eachwell in wells: - for eachsite in sites: - templateMessage_zproj = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join( - datafilepath, plate, csvname - ), - } - zprojqueue.scheduleBatch(templateMessage_zproj) + for eachwell in wells: + for eachsite in sites: + templateMessage_zproj = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + zprojqueue.scheduleBatch(templateMessage_zproj) else: if not batchfile: batchfile = "Batch_data_zproj.h5" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - templateMessage_zproj = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - zprojqueue.scheduleBatch(templateMessage_zproj) - else: - for eachwell in wells: - for eachsite in sites: - templateMessage_zproj = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - zprojqueue.scheduleBatch(templateMessage_zproj) + for eachwell in wells: + for eachsite in sites: + templateMessage_zproj = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + zprojqueue.scheduleBatch(templateMessage_zproj) print("Z projection job submitted. Check your queue") elif step == "illum": @@ -250,52 +230,28 @@ def run_batch_general( csvname = "load_data.csv" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(datafilepath, plate, csvname), - } - qcqueue.scheduleBatch(templateMessage_qc) - else: - for eachwell in wells: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(datafilepath, plate, csvname), - } - qcqueue.scheduleBatch(templateMessage_qc) + for eachwell in wells: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(datafilepath, plate, csvname), + } + qcqueue.scheduleBatch(templateMessage_qc) else: if not batchfile: batchfile = "Batch_data_qc.h5" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - qcqueue.scheduleBatch(templateMessage_qc) - else: - for well in wells: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - qcqueue.scheduleBatch(templateMessage_qc) + for well in wells: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + qcqueue.scheduleBatch(templateMessage_qc) print("QC job submitted. Check your queue") @@ -310,60 +266,32 @@ def run_batch_general( csvname = "load_data.csv" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join( - datafilepath, plate, csvname - ), - } - qcqueue.scheduleBatch(templateMessage_qc) - else: - for well in wells: - for eachsite in sites: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join( - datafilepath, plate, csvname - ), - } - qcqueue.scheduleBatch(templateMessage_qc) + for well in wells: + for eachsite in sites: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + qcqueue.scheduleBatch(templateMessage_qc) else: if not batchfile: batchfile = "Batch_data_qc.h5" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - qcqueue.scheduleBatch(templateMessage_qc) - else: - for well in wells: - for eachsite in sites: - templateMessage_qc = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - qcqueue.scheduleBatch(templateMessage_qc) + for well in wells: + for eachsite in sites: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + qcqueue.scheduleBatch(templateMessage_qc) print("QC job submitted. Check your queue") @@ -378,52 +306,28 @@ def run_batch_general( csvname = "load_data_with_illum.csv" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - templateMessage_ad = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(datafilepath, plate, csvname), - } - assaydevqueue.scheduleBatch(templateMessage_ad) - else: - for well in wells: - templateMessage_ad = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(datafilepath, plate, csvname), - } - assaydevqueue.scheduleBatch(templateMessage_ad) + for well in wells: + templateMessage_ad = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(datafilepath, plate, csvname), + } + assaydevqueue.scheduleBatch(templateMessage_ad) else: if not batchfile: batchfile = "Batch_data_assaydev.h5" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - templateMessage_ad = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - assaydevqueue.scheduleBatch(templateMessage_ad) - else: - for eachwell in wells: - templateMessage_ad = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - assaydevqueue.scheduleBatch(templateMessage_ad) + for eachwell in wells: + templateMessage_ad = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + assaydevqueue.scheduleBatch(templateMessage_ad) print("AssayDev job submitted. Check your queue") @@ -441,64 +345,34 @@ def run_batch_general( if not csvname: csvname = "load_data_with_illum.csv" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - templateMessage_analysis = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join( - datafilepath, plate, csvname - ), - } - analysisqueue.scheduleBatch(templateMessage_analysis) - else: - for eachwell in wells: - for eachsite in sites: - templateMessage_analysis = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(pipelinepath, pipeline), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join( - datafilepath, plate, csvname - ), - } - analysisqueue.scheduleBatch(templateMessage_analysis) + for eachwell in wells: + for eachsite in sites: + templateMessage_analysis = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + analysisqueue.scheduleBatch(templateMessage_analysis) else: if not batchfile: batchfile = "Batch_data_analysis.h5" for plate in platelist: - if all(len(ele) == 0 for ele in wells): - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - templateMessage_analysis = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - analysisqueue.scheduleBatch(templateMessage_analysis) - else: - for eachwell in wells: - for eachsite in sites: - templateMessage_analysis = { - "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", - "pipeline": posixpath.join(batchpath, batchfile), - "output": outpath, - "output_structure": outputstructure, - "input": inputpath, - "data_file": posixpath.join(batchpath, batchfile), - } - analysisqueue.scheduleBatch(templateMessage_analysis) + for eachwell in wells: + for eachsite in sites: + templateMessage_analysis = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + analysisqueue.scheduleBatch(templateMessage_analysis) print("Analysis job submitted. Check your queue") @@ -556,6 +430,13 @@ def run_batch_general( default="", help="Explicit list of rows to process. Will overwrite --rows and --columns.", ) + parser.add_argument( + "--excluded-wells", + dest="excluded_wells", + type=lambda s: list(s.split(",")), + default="", + help="Explicit list wells not to process. Works with either wells or --rows and --columns.", + ) parser.add_argument( "--sites", dest="sites", @@ -656,4 +537,5 @@ def run_batch_general( usebatch=args.usebatch, batchfile=args.batchfile, batchpath=args.batchpath, + excluded_wells=args.excluded_wells, )