Skip to content

Commit e5664a2

Browse files
authored
Merge pull request #3152 from satra/fix/et-subprocess
fix: improve version checking for nodes of workflows
2 parents 39012c5 + e2db192 commit e5664a2

File tree

6 files changed

+119
-11
lines changed

6 files changed

+119
-11
lines changed

nipype/__init__.py

+4-6
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,7 @@
1414
import os
1515
from distutils.version import LooseVersion
1616

17-
from .info import (
18-
URL as __url__,
19-
STATUS as __status__,
20-
__version__,
21-
)
17+
from .info import URL as __url__, STATUS as __status__, __version__
2218
from .utils.config import NipypeConfig
2319
from .utils.logger import Logging
2420
from .refs import due
@@ -105,6 +101,8 @@ def check_latest_version(raise_exception=False):
105101
packname="nipype", version=__version__, latest=latest["version"]
106102
)
107103
)
104+
else:
105+
logger.info("No new version available.")
108106
if latest["bad_versions"] and any(
109107
[
110108
LooseVersion(__version__) == LooseVersion(ver)
@@ -126,7 +124,7 @@ def check_latest_version(raise_exception=False):
126124
if config.getboolean("execution", "check_version"):
127125
import __main__
128126

129-
if not hasattr(__main__, "__file__"):
127+
if not hasattr(__main__, "__file__") and "NIPYPE_NO_ET" not in os.environ:
130128
from .interfaces.base import BaseInterface
131129

132130
if BaseInterface._etelemetry_version_data is None:

nipype/interfaces/base/core.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,10 @@ class BaseInterface(Interface):
168168
def __init__(
169169
self, from_file=None, resource_monitor=None, ignore_exception=False, **inputs
170170
):
171-
if config.getboolean("execution", "check_version"):
171+
if (
172+
config.getboolean("execution", "check_version")
173+
and "NIPYPE_NO_ET" not in os.environ
174+
):
172175
from ... import check_latest_version
173176

174177
if BaseInterface._etelemetry_version_data is None:

nipype/pipeline/plugins/legacymultiproc.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ class NonDaemonPool(pool.Pool):
153153
Process = NonDaemonProcess
154154

155155

156+
def process_initializer(cwd):
157+
"""Initializes the environment of the child process"""
158+
os.chdir(cwd)
159+
os.environ["NIPYPE_NO_ET"] = "1"
160+
161+
156162
class LegacyMultiProcPlugin(DistributedPluginBase):
157163
"""
158164
Execute workflow with multiprocessing, not sending more jobs at once
@@ -223,7 +229,7 @@ def __init__(self, plugin_args=None):
223229
self.pool = NipypePool(
224230
processes=self.processors,
225231
maxtasksperchild=maxtasks,
226-
initializer=os.chdir,
232+
initializer=process_initializer,
227233
initargs=(self._cwd,),
228234
)
229235
except TypeError:

nipype/pipeline/plugins/multiproc.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
# Import packages
1111
import os
1212
import multiprocessing as mp
13-
from concurrent.futures import ProcessPoolExecutor
13+
from concurrent.futures import ProcessPoolExecutor, wait
1414
from traceback import format_exception
1515
import sys
1616
from logging import INFO
@@ -73,6 +73,12 @@ def run_node(node, updatehash, taskid):
7373
return result
7474

7575

76+
def process_initializer(cwd):
77+
"""Initializes the environment of the child process"""
78+
os.chdir(cwd)
79+
os.environ["NIPYPE_NO_ET"] = "1"
80+
81+
7682
class MultiProcPlugin(DistributedPluginBase):
7783
"""
7884
Execute workflow with multiprocessing, not sending more jobs at once
@@ -134,16 +140,18 @@ def __init__(self, plugin_args=None):
134140
)
135141

136142
try:
137-
mp_context = mp.context.get_context(self.plugin_args.get("mp_context"))
143+
mp_context = mp.get_context(self.plugin_args.get("mp_context"))
138144
self.pool = ProcessPoolExecutor(
139145
max_workers=self.processors,
140-
initializer=os.chdir,
146+
initializer=process_initializer,
141147
initargs=(self._cwd,),
142148
mp_context=mp_context,
143149
)
144150
except (AttributeError, TypeError):
145151
# Python < 3.7 does not support initialization or contexts
146152
self.pool = ProcessPoolExecutor(max_workers=self.processors)
153+
result_future = self.pool.submit(process_initializer, self._cwd)
154+
wait([result_future], timeout=5)
147155

148156
self._stats = None
149157

nipype/pipeline/plugins/tools.py

+6
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,13 @@ def create_pyscript(node, updatehash=False, store_exception=True):
125125
can_import_matplotlib = False
126126
pass
127127
128+
import os
129+
value = os.environ.get('NIPYPE_NO_ET', None)
130+
if value is None:
131+
# disable ET for any submitted job
132+
os.environ['NIPYPE_NO_ET'] = "1"
128133
from nipype import config, logging
134+
129135
from nipype.utils.filemanip import loadpkl, savepkl
130136
from socket import gethostname
131137
from traceback import format_exception

nipype/tests/test_nipype.py

+87
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,90 @@ def test_nipype_info():
1919
def test_git_hash():
2020
# removing the first "g" from gitversion
2121
get_nipype_gitversion()[1:] == get_info()["commit_hash"]
22+
23+
24+
def _check_no_et():
25+
import os
26+
from unittest.mock import patch
27+
28+
et = os.getenv("NIPYPE_NO_ET") is None
29+
30+
with patch.dict("os.environ", {"NIPYPE_NO_ET": "1"}):
31+
from nipype.interfaces.base import BaseInterface
32+
33+
ver_data = BaseInterface._etelemetry_version_data
34+
35+
if et and ver_data is None:
36+
raise ValueError(
37+
"etelemetry enabled and version data missing - double hits likely"
38+
)
39+
40+
return et
41+
42+
43+
def test_no_et(tmp_path):
44+
from unittest.mock import patch
45+
from nipype.pipeline import engine as pe
46+
from nipype.interfaces import utility as niu
47+
from nipype.interfaces.base import BaseInterface
48+
49+
# Pytest doesn't trigger this, so let's pretend it's there
50+
with patch.object(BaseInterface, "_etelemetry_version_data", {}):
51+
52+
# Direct function call - environment not set
53+
f = niu.Function(function=_check_no_et)
54+
res = f.run()
55+
assert res.outputs.out is True
56+
57+
# Basic node - environment not set
58+
n = pe.Node(
59+
niu.Function(function=_check_no_et), name="n", base_dir=str(tmp_path)
60+
)
61+
res = n.run()
62+
assert res.outputs.out is True
63+
64+
# Linear run - environment not set
65+
wf1 = pe.Workflow(name="wf1", base_dir=str(tmp_path))
66+
wf1.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")])
67+
res = wf1.run()
68+
assert next(iter(res.nodes)).result.outputs.out is True
69+
70+
# MultiProc run - environment initialized with NIPYPE_NO_ET
71+
wf2 = pe.Workflow(name="wf2", base_dir=str(tmp_path))
72+
wf2.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")])
73+
res = wf2.run(plugin="MultiProc", plugin_args={"n_procs": 1})
74+
assert next(iter(res.nodes)).result.outputs.out is False
75+
76+
# LegacyMultiProc run - environment initialized with NIPYPE_NO_ET
77+
wf3 = pe.Workflow(name="wf3", base_dir=str(tmp_path))
78+
wf3.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")])
79+
res = wf3.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1})
80+
assert next(iter(res.nodes)).result.outputs.out is False
81+
82+
# run_without_submitting - environment not set
83+
wf4 = pe.Workflow(name="wf4", base_dir=str(tmp_path))
84+
wf4.add_nodes(
85+
[
86+
pe.Node(
87+
niu.Function(function=_check_no_et),
88+
run_without_submitting=True,
89+
name="n",
90+
)
91+
]
92+
)
93+
res = wf4.run(plugin="MultiProc", plugin_args={"n_procs": 1})
94+
assert next(iter(res.nodes)).result.outputs.out is True
95+
96+
# LegacyMultiProc run - environment initialized with NIPYPE_NO_ET
97+
wf5 = pe.Workflow(name="wf5", base_dir=str(tmp_path))
98+
wf5.add_nodes(
99+
[
100+
pe.Node(
101+
niu.Function(function=_check_no_et),
102+
run_without_submitting=True,
103+
name="n",
104+
)
105+
]
106+
)
107+
res = wf5.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1})
108+
assert next(iter(res.nodes)).result.outputs.out is True

0 commit comments

Comments
 (0)