From 4fed864215a18a513dab111942884b57479540d2 Mon Sep 17 00:00:00 2001 From: Robert Date: Wed, 11 Dec 2024 10:06:08 -0500 Subject: [PATCH] omics/managers: ensure the log file for update_from_pipeline() is closed Saw a ResourceWarning issued inside the atomic_dry wrapper, so let's make sure the file is written to and closed properly before it returns to the atomic_dry wrapper. Implemented via ExitStack since the log file opening is conditional. The input file is now also opened via the ExitStack to keep the indentation level for the input file handling portion as-is. --- mibios/omics/managers.py | 121 +++++++++++++++++++++------------------ 1 file changed, 65 insertions(+), 56 deletions(-) diff --git a/mibios/omics/managers.py b/mibios/omics/managers.py index 05bc0d33..4713f04c 100644 --- a/mibios/omics/managers.py +++ b/mibios/omics/managers.py @@ -2,6 +2,7 @@ Module for data load managers """ from collections import defaultdict +from contextlib import ExitStack from datetime import date from functools import partial from itertools import groupby, islice @@ -702,46 +703,51 @@ def update_from_pipeline_registry( (SUCCESS, 'import_success'), ) - # setup log file output - if log_dir := getattr(settings, 'IMPORT_LOG_DIR', ''): - # find a unique log file name and set up the log handler - path = Path(log_dir) / 'omics_status_update' - today = date.today() - suffix = f'.{today}.log' - num = 0 - while path.with_suffix(suffix).exists(): - num += 1 - suffix = f'.{today}.{num}.log' - - log_file = path.with_suffix(suffix).open('w') - print(f'Logging to: {log_file.name}') - else: - log_file = sys.stdout + # ExitStack: To cleanly close log file (if needed) and input file as + # there is some interaction with the atomic_dry wrapper + with ExitStack() as estack: + + # setup log file output + if log_dir := getattr(settings, 'IMPORT_LOG_DIR', ''): + # find a unique log file name and set up the log handler + log_file_base = Path(log_dir) / 'omics_status_update' + today = date.today() + suffix = f'.{today}.log' + num = 0 + while (log_file := log_file_base.with_suffix(suffix)).exists(): + num += 1 + suffix = f'.{today}.{num}.log' + + log_file = estack.enter_context(log_file.open('w')) + print(f'Logging to: {log_file.name}') + else: + log_file = sys.stdout - def log(msg): - if quiet and msg.startswith('INFO'): - return - print(msg, file=log_file) + def log(msg): + if quiet and msg.startswith('INFO'): + return + print(msg, file=log_file) - def err(msg): - msg = f'ERROR: {msg}' - if skip_on_error: - log(msg) - else: - raise RuntimeError(msg) + def err(msg): + msg = f'ERROR: {msg}' + if skip_on_error: + log(msg) + else: + raise RuntimeError(msg) - objs = self.select_related('dataset').in_bulk(field_name='sample_id') + objs = self.select_related('dataset') \ + .in_bulk(field_name='sample_id') - if source_file is None: - source_file = self.get_omics_import_file() + if source_file is None: + source_file = self.get_omics_import_file() - with open(source_file) as f: print(f'Reading {source_file} ...') - head = f.readline().rstrip('\n').split('\t') + srcf = estack.enter_context(open(source_file)) + head = srcf.readline().rstrip('\n').split('\t') for index, column in COLUMN_NAMES: if head[index] != column: raise RuntimeError( - f'unexpected header in {f.name}: 0-based column ' + f'unexpected header in {srcf.name}: 0-based column ' f'{index} is "{head[index]}" but expected "{column}"' ) @@ -751,7 +757,7 @@ def err(msg): unchanged = 0 notfound = 0 nosuccess = 0 - for lineno, line in enumerate(f, start=1): + for lineno, line in enumerate(srcf, start=1): row = line.rstrip('\n').split('\t') sample_id = row[SAMPLE_ID] dataset = row[STUDY_ID] @@ -827,30 +833,33 @@ def err(msg): if not new: tr.save() # update timestamp - log('Summary:') - log(f' records read from file: {lineno}') - log(f' (unique) samples listed: {len(samp_id_seen)}') - log(f' samples updated: {changed}') - if notfound: - log(f'WARNING Samples missing from database (or hidden): ' - f'{notfound}') - - if nosuccess: - log(f'WARNING Samples not marked "import_success": {nosuccess}') - - if unchanged: - log(f'Data for {unchanged} listed samples remain unchanged.') - - missing_or_bad = self.exclude(pk__in=good_seen) - if missing_or_bad.exists(): - log(f'WARNING The DB has {missing_or_bad.count()} samples ' - f'which are missing from {source_file} or which had ' - f'to be skipped for other reasons.') - - missing = self.exclude(sample_id__in=samp_id_seen) - if missing.exists(): - log(f'WARNING The DB has {missing.count()} samples not at all ' - f'listed in {source_file}') + srcf.close() + + log('Summary:') + log(f' records read from file: {lineno}') + log(f' (unique) samples listed: {len(samp_id_seen)}') + log(f' samples updated: {changed}') + if notfound: + log(f'WARNING Samples missing from database (or hidden): ' + f'{notfound}') + + if nosuccess: + log(f'WARNING Samples not marked "import_success": ' + f'{nosuccess}') + + if unchanged: + log(f'Data for {unchanged} listed samples remain unchanged.') + + missing_or_bad = self.exclude(pk__in=good_seen) + if missing_or_bad.exists(): + log(f'WARNING The DB has {missing_or_bad.count()} samples ' + f'which are missing from {source_file} or which had ' + f'to be skipped for other reasons.') + + missing = self.exclude(sample_id__in=samp_id_seen) + if missing.exists(): + log(f'WARNING The DB has {missing.count()} samples not at all ' + f'listed in {source_file}') def get_metagenomic_loader_script(self): """