Source code for temo.fit.spawn
"""
A convenience module for forking multiple processes and capturing stdout,
for use in code developed for:
Ian H. Bell and Eric W. Lemmon, "Automatic fitting of binary interaction parameters for multi-fluid
Helmholtz-energy-explicit mixture models", 2016
No dependencies aside from standard libraries included in python
By Ian H. Bell, NIST (ian.bell@nist.gov)
LICENSE: public domain, but please reference paper
"""
from __future__ import print_function
import sys, traceback, time
from multiprocessing import Process, Pipe
[docs]
class RedirectText2Pipe(object):
"""
An text output redirector
"""
def __init__(self, pipe_inlet, file_object = None, prefix = ''):
self.pipe_inlet = pipe_inlet
self.prefix = ''
self.file_object = file_object
[docs]
def write(self, string):
if string.strip():
self.pipe_inlet.send(str(string))
if self.file_object is not None:
self.file_object.write(prefix + str(string))
[docs]
def flush(self):
return None
[docs]
class Guppy(Process):
def __init__(self, pipe_results, target, *args, **kwargs):
Process.__init__(self)
self.pipe_stdio = kwargs.pop('pipe_stdio', None)
self.pipe_results = pipe_results
self.target = target
self.args = args
self.kwargs = kwargs
[docs]
def run(self):
if self.pipe_stdio is not None:
redir = RedirectText2Pipe(self.pipe_stdio)
sys.stdout = redir
sys.stderr = redir
try:
res = self.target(*self.args, **self.kwargs)
self.pipe_results.send(res)
except BaseException as BE:
traceback.print_exc()
self.done()
[docs]
def done(self):
self.pipe_results.send('p'+str(self.pid)+' DONE')
[docs]
class Spawner(object):
def __init__(self, inputs, Nproc_max = 1):
"""
Initialize the Spawner class that manages the processes that are spun off
"""
self.processes = []
self.inputs = inputs
self.Nproc_max = Nproc_max
[docs]
def add_process(self):
"""
If an input is waiting in the queue and a slot has opened up, add the process
"""
if self.inputs and len(self.processes) < self.Nproc_max:
input = self.inputs.pop(0)
p = {}
p['pipe_mine'], p['pipe_theirs'] = Pipe()
p['pipe_stdio_mine'], p['pipe_stdio_theirs'] = Pipe()
kwargs = input.get('kwargs',{})
kwargs['pipe_stdio'] = p['pipe_stdio_theirs']
p['proc'] = Guppy(p['pipe_theirs'], input['target'], *input['args'], **kwargs)
p['proc'].daemon = False
p['proc'].start()
self.processes.append(p)
print(len(self.inputs), ' inputs remain')
else:
return
[docs]
def run(self):
results = []
while self.inputs or self.processes:
self.add_process()
for p in self.processes:
if p['pipe_stdio_mine'].poll():
print('p', p['proc'].pid, '>', p['pipe_stdio_mine'].recv())
if p['pipe_mine'].poll():
res = p['pipe_mine'].recv()
if res == 'p'+str(p['proc'].pid)+' DONE':
while p['pipe_stdio_mine'].poll():
print('p', p['proc'].pid, '>', p['pipe_stdio_mine'].recv())
p['proc'].join()
self.processes.pop(self.processes.index(p))
#print 'process w/ pid ' + str(p['proc'].pid) + ' is done'
else:
results.append(res)
time.sleep(0.01) # This time keeps the main thread from locking up because
# it is constantly trying to read the pipes
return results
[docs]
def f(x):
print('hi')
return sum(x)
if __name__=='__main__':
# Call the process directly without redirecting stdout
p = {}
p['pipe_mine'], p['pipe_theirs'] = Pipe()
p['pipe_stdio_mine'], p['pipe_stdio_theirs'] = Pipe()
g = Guppy(p['pipe_theirs'], f, [1,2,3])
g.run()
print(p['pipe_mine'].recv())
# Call the spawner
spawner = Spawner([dict(target = f, args = (range(1, n),)) for n in range(500)], Nproc_max = 6)
print(spawner.run())