It is possible to create process pipelines using subprocess.Popen
, by just
using stdout=subprocess.PIPE
and stdin=otherproc.stdout
.
Almost.
In a pipeline created in this way, the stdout of all processes except the last is opened twice: once in the script that has run the subprocess and another time in the standard input of the next process in the pipeline.
This is a problem because if a process closes its stdin, the previous process
in the pipeline does not get SIGPIPE
when trying to write to its stdout,
because that pipe is still open on the caller process. If this happens, a wait
on that process will hang forever: the child process waits for the parent to
read its stdout, the parent process waits for the child process to exit.
The trick is to close the stdout of each process in the pipeline except the last just after creating them:
#!/usr/bin/python
# coding=utf-8
import subprocess
def pipe(*args):
'''
Takes as parameters several dicts, each with the same
parameters passed to popen.
Runs the various processes in a pipeline, connecting
the stdout of every process except the last with the
stdin of the next process.
'''
if len(args) < 2:
raise ValueError, "pipe needs at least 2 processes"
# Set stdout=PIPE in every subprocess except the last
for i in args[:-1]:
i["stdout"] = subprocess.PIPE
# Runs all subprocesses connecting stdins and stdouts to create the
# pipeline. Closes stdouts to avoid deadlocks.
popens = [subprocess.Popen(**args[0])]
for i in range(1,len(args)):
args[i]["stdin"] = popens[i-1].stdout
popens.append(subprocess.Popen(**args[i]))
popens[i-1].stdout.close()
# Returns the array of subprocesses just created
return popens
At this point, it's nice to write a function that waits for the whole pipeline to terminate and returns an array of result codes:
def pipe_wait(popens):
'''
Given an array of Popen objects returned by the
pipe method, wait for all processes to terminate
and return the array with their return values.
'''
results = [0] * len(popens)
while popens:
last = popens.pop(-1)
results[len(popens)] = last.wait()
return results
And, look and behold, we can now easily run a pipeline and get the return codes of every single process in it:
process1 = dict(args='sleep 1; grep line2 testfile', shell=True)
process2 = dict(args='awk \'{print $3}\'', shell=True)
process3 = dict(args='true', shell=True)
popens = pipe(process1, process2, process3)
result = pipe_wait(popens)
print result
Update: Colin Watson suggests an improvement to compensate for Python's nonstandard SIGPIPE handling.
Colin Watson has a similar library for C.