Skip to content

Commit

Permalink
omics/managers: ensure the log file for update_from_pipeline() is closed
Browse files Browse the repository at this point in the history
Saw a ResourceWarning issued inside the atomic_dry wrapper, so let's
make sure the file is written to and closed properly before it returns
to the atomic_dry wrapper.  Implemented via ExitStack since the log file
opening is conditional.  The input file is now also opened via the
ExitStack to keep the indentation level for the input file handling
portion as-is.
  • Loading branch information
robert102 committed Dec 11, 2024
1 parent 4304f6a commit 4fed864
Showing 1 changed file with 65 additions and 56 deletions.
121 changes: 65 additions & 56 deletions mibios/omics/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Module for data load managers
"""
from collections import defaultdict
from contextlib import ExitStack
from datetime import date
from functools import partial
from itertools import groupby, islice
Expand Down Expand Up @@ -702,46 +703,51 @@ def update_from_pipeline_registry(
(SUCCESS, 'import_success'),
)

# setup log file output
if log_dir := getattr(settings, 'IMPORT_LOG_DIR', ''):
# find a unique log file name and set up the log handler
path = Path(log_dir) / 'omics_status_update'
today = date.today()
suffix = f'.{today}.log'
num = 0
while path.with_suffix(suffix).exists():
num += 1
suffix = f'.{today}.{num}.log'

log_file = path.with_suffix(suffix).open('w')
print(f'Logging to: {log_file.name}')
else:
log_file = sys.stdout
# ExitStack: To cleanly close log file (if needed) and input file as
# there is some interaction with the atomic_dry wrapper
with ExitStack() as estack:

# setup log file output
if log_dir := getattr(settings, 'IMPORT_LOG_DIR', ''):
# find a unique log file name and set up the log handler
log_file_base = Path(log_dir) / 'omics_status_update'
today = date.today()
suffix = f'.{today}.log'
num = 0
while (log_file := log_file_base.with_suffix(suffix)).exists():
num += 1
suffix = f'.{today}.{num}.log'

log_file = estack.enter_context(log_file.open('w'))
print(f'Logging to: {log_file.name}')
else:
log_file = sys.stdout

def log(msg):
if quiet and msg.startswith('INFO'):
return
print(msg, file=log_file)
def log(msg):
if quiet and msg.startswith('INFO'):
return
print(msg, file=log_file)

def err(msg):
msg = f'ERROR: {msg}'
if skip_on_error:
log(msg)
else:
raise RuntimeError(msg)
def err(msg):
msg = f'ERROR: {msg}'
if skip_on_error:
log(msg)
else:
raise RuntimeError(msg)

objs = self.select_related('dataset').in_bulk(field_name='sample_id')
objs = self.select_related('dataset') \
.in_bulk(field_name='sample_id')

if source_file is None:
source_file = self.get_omics_import_file()
if source_file is None:
source_file = self.get_omics_import_file()

with open(source_file) as f:
print(f'Reading {source_file} ...')
head = f.readline().rstrip('\n').split('\t')
srcf = estack.enter_context(open(source_file))
head = srcf.readline().rstrip('\n').split('\t')
for index, column in COLUMN_NAMES:
if head[index] != column:
raise RuntimeError(
f'unexpected header in {f.name}: 0-based column '
f'unexpected header in {srcf.name}: 0-based column '
f'{index} is "{head[index]}" but expected "{column}"'
)

Expand All @@ -751,7 +757,7 @@ def err(msg):
unchanged = 0
notfound = 0
nosuccess = 0
for lineno, line in enumerate(f, start=1):
for lineno, line in enumerate(srcf, start=1):
row = line.rstrip('\n').split('\t')
sample_id = row[SAMPLE_ID]
dataset = row[STUDY_ID]
Expand Down Expand Up @@ -827,30 +833,33 @@ def err(msg):
if not new:
tr.save() # update timestamp

log('Summary:')
log(f' records read from file: {lineno}')
log(f' (unique) samples listed: {len(samp_id_seen)}')
log(f' samples updated: {changed}')
if notfound:
log(f'WARNING Samples missing from database (or hidden): '
f'{notfound}')

if nosuccess:
log(f'WARNING Samples not marked "import_success": {nosuccess}')

if unchanged:
log(f'Data for {unchanged} listed samples remain unchanged.')

missing_or_bad = self.exclude(pk__in=good_seen)
if missing_or_bad.exists():
log(f'WARNING The DB has {missing_or_bad.count()} samples '
f'which are missing from {source_file} or which had '
f'to be skipped for other reasons.')

missing = self.exclude(sample_id__in=samp_id_seen)
if missing.exists():
log(f'WARNING The DB has {missing.count()} samples not at all '
f'listed in {source_file}')
srcf.close()

log('Summary:')
log(f' records read from file: {lineno}')
log(f' (unique) samples listed: {len(samp_id_seen)}')
log(f' samples updated: {changed}')
if notfound:
log(f'WARNING Samples missing from database (or hidden): '
f'{notfound}')

if nosuccess:
log(f'WARNING Samples not marked "import_success": '
f'{nosuccess}')

if unchanged:
log(f'Data for {unchanged} listed samples remain unchanged.')

missing_or_bad = self.exclude(pk__in=good_seen)
if missing_or_bad.exists():
log(f'WARNING The DB has {missing_or_bad.count()} samples '
f'which are missing from {source_file} or which had '
f'to be skipped for other reasons.')

missing = self.exclude(sample_id__in=samp_id_seen)
if missing.exists():
log(f'WARNING The DB has {missing.count()} samples not at all '
f'listed in {source_file}')

def get_metagenomic_loader_script(self):
"""
Expand Down

0 comments on commit 4fed864

Please sign in to comment.