Skip to content

Commit c4945eb

Browse files
authored
Merge pull request #2777 from oesteban/enh/process-spawn-2
[MAINT] Move ``interfaces.base.run_command`` to ``nipype.utils.subprocess``
2 parents e475331 + 2284f4b commit c4945eb

File tree

3 files changed

+208
-192
lines changed

3 files changed

+208
-192
lines changed

nipype/interfaces/base/core.py

Lines changed: 4 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
from builtins import object, open, str, bytes
1919

20-
import gc
2120
from copy import deepcopy
2221
from datetime import datetime as dt
23-
import errno
2422
import os
2523
import re
2624
import platform
27-
import select
2825
import subprocess as sp
2926
import shlex
3027
import sys
@@ -35,16 +32,16 @@
3532
from ... import config, logging, LooseVersion
3633
from ...utils.provenance import write_provenance
3734
from ...utils.misc import trim, str2bool, rgetcwd
38-
from ...utils.filemanip import (FileNotFoundError, split_filename, read_stream,
39-
which, get_dependencies, canonicalize_env as
40-
_canonicalize_env)
35+
from ...utils.filemanip import (FileNotFoundError, split_filename,
36+
which, get_dependencies)
37+
from ...utils.subprocess import run_command
4138

4239
from ...external.due import due
4340

4441
from .traits_extension import traits, isdefined, TraitError
4542
from .specs import (BaseInterfaceInputSpec, CommandLineInputSpec,
4643
StdOutCommandLineInputSpec, MpiCommandLineInputSpec)
47-
from .support import (Bunch, Stream, InterfaceResult, NipypeInterfaceError)
44+
from .support import (Bunch, InterfaceResult, NipypeInterfaceError)
4845

4946
from future import standard_library
5047
standard_library.install_aliases()
@@ -732,135 +729,6 @@ def _list_outputs(self):
732729
return self._results
733730

734731

735-
def run_command(runtime, output=None, timeout=0.01):
736-
"""Run a command, read stdout and stderr, prefix with timestamp.
737-
738-
The returned runtime contains a merged stdout+stderr log with timestamps
739-
"""
740-
741-
# Init variables
742-
cmdline = runtime.cmdline
743-
env = _canonicalize_env(runtime.environ)
744-
745-
errfile = None
746-
outfile = None
747-
stdout = sp.PIPE
748-
stderr = sp.PIPE
749-
750-
if output == 'file':
751-
outfile = os.path.join(runtime.cwd, 'output.nipype')
752-
stdout = open(outfile, 'wb') # t=='text'===default
753-
stderr = sp.STDOUT
754-
elif output == 'file_split':
755-
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
756-
stdout = open(outfile, 'wb')
757-
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
758-
stderr = open(errfile, 'wb')
759-
elif output == 'file_stdout':
760-
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
761-
stdout = open(outfile, 'wb')
762-
elif output == 'file_stderr':
763-
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
764-
stderr = open(errfile, 'wb')
765-
766-
proc = sp.Popen(
767-
cmdline,
768-
stdout=stdout,
769-
stderr=stderr,
770-
shell=True,
771-
cwd=runtime.cwd,
772-
env=env,
773-
close_fds=(not sys.platform.startswith('win')),
774-
)
775-
776-
result = {
777-
'stdout': [],
778-
'stderr': [],
779-
'merged': [],
780-
}
781-
782-
if output == 'stream':
783-
streams = [
784-
Stream('stdout', proc.stdout),
785-
Stream('stderr', proc.stderr)
786-
]
787-
788-
def _process(drain=0):
789-
try:
790-
res = select.select(streams, [], [], timeout)
791-
except select.error as e:
792-
iflogger.info(e)
793-
if e[0] == errno.EINTR:
794-
return
795-
else:
796-
raise
797-
else:
798-
for stream in res[0]:
799-
stream.read(drain)
800-
801-
while proc.returncode is None:
802-
proc.poll()
803-
_process()
804-
805-
_process(drain=1)
806-
807-
# collect results, merge and return
808-
result = {}
809-
temp = []
810-
for stream in streams:
811-
rows = stream._rows
812-
temp += rows
813-
result[stream._name] = [r[2] for r in rows]
814-
temp.sort()
815-
result['merged'] = [r[1] for r in temp]
816-
817-
if output.startswith('file'):
818-
proc.wait()
819-
if outfile is not None:
820-
stdout.flush()
821-
stdout.close()
822-
with open(outfile, 'rb') as ofh:
823-
stdoutstr = ofh.read()
824-
result['stdout'] = read_stream(stdoutstr, logger=iflogger)
825-
del stdoutstr
826-
827-
if errfile is not None:
828-
stderr.flush()
829-
stderr.close()
830-
with open(errfile, 'rb') as efh:
831-
stderrstr = efh.read()
832-
result['stderr'] = read_stream(stderrstr, logger=iflogger)
833-
del stderrstr
834-
835-
if output == 'file':
836-
result['merged'] = result['stdout']
837-
result['stdout'] = []
838-
else:
839-
stdout, stderr = proc.communicate()
840-
if output == 'allatonce': # Discard stdout and stderr otherwise
841-
result['stdout'] = read_stream(stdout, logger=iflogger)
842-
result['stderr'] = read_stream(stderr, logger=iflogger)
843-
844-
runtime.returncode = proc.returncode
845-
try:
846-
proc.terminate() # Ensure we are done
847-
except OSError as error:
848-
# Python 2 raises when the process is already gone
849-
if error.errno != errno.ESRCH:
850-
raise
851-
852-
# Dereference & force GC for a cleanup
853-
del proc
854-
del stdout
855-
del stderr
856-
gc.collect()
857-
858-
runtime.stderr = '\n'.join(result['stderr'])
859-
runtime.stdout = '\n'.join(result['stdout'])
860-
runtime.merged = '\n'.join(result['merged'])
861-
return runtime
862-
863-
864732
class CommandLine(BaseInterface):
865733
"""Implements functionality to interact with command line programs
866734
class must be instantiated with a command argument

nipype/interfaces/base/support.py

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,11 @@
99
"""
1010
from __future__ import (print_function, division, unicode_literals,
1111
absolute_import)
12-
from builtins import range, object, str
12+
from builtins import object, str
1313

1414
import os
1515
from copy import deepcopy
1616

17-
import datetime
18-
import locale
19-
2017
from ... import logging
2118
from ...utils.misc import is_container
2219
from ...utils.filemanip import md5, to_str, hash_infile
@@ -238,58 +235,6 @@ def version(self):
238235
return self._version
239236

240237

241-
class Stream(object):
242-
"""Function to capture stdout and stderr streams with timestamps
243-
244-
stackoverflow.com/questions/4984549/merge-and-sync-stdout-and-stderr/5188359
245-
"""
246-
247-
def __init__(self, name, impl):
248-
self._name = name
249-
self._impl = impl
250-
self._buf = ''
251-
self._rows = []
252-
self._lastidx = 0
253-
self.default_encoding = locale.getdefaultlocale()[1] or 'UTF-8'
254-
255-
def fileno(self):
256-
"Pass-through for file descriptor."
257-
return self._impl.fileno()
258-
259-
def read(self, drain=0):
260-
"Read from the file descriptor. If 'drain' set, read until EOF."
261-
while self._read(drain) is not None:
262-
if not drain:
263-
break
264-
265-
def _read(self, drain):
266-
"Read from the file descriptor"
267-
fd = self.fileno()
268-
buf = os.read(fd, 4096).decode(self.default_encoding)
269-
if not buf and not self._buf:
270-
return None
271-
if '\n' not in buf:
272-
if not drain:
273-
self._buf += buf
274-
return []
275-
276-
# prepend any data previously read, then split into lines and format
277-
buf = self._buf + buf
278-
if '\n' in buf:
279-
tmp, rest = buf.rsplit('\n', 1)
280-
else:
281-
tmp = buf
282-
rest = None
283-
self._buf = rest
284-
now = datetime.datetime.now().isoformat()
285-
rows = tmp.split('\n')
286-
self._rows += [(now, '%s %s:%s' % (self._name, now, r), r)
287-
for r in rows]
288-
for idx in range(self._lastidx, len(self._rows)):
289-
iflogger.info(self._rows[idx][1])
290-
self._lastidx = len(self._rows)
291-
292-
293238
def load_template(name):
294239
"""
295240
Deprecated stub for backwards compatibility,

0 commit comments

Comments
 (0)