Source code for alphatwirl.concurrently.MultiprocessingDropbox

# Tai Sakuma <tai.sakuma@gmail.com>
import mantichora

from alphatwirl.misc.deprecation import _deprecated_class_method_option

##__________________________________________________________________||
[docs]class MultiprocessingDropbox: """A drop box for task packages. The tasks will be executed in multiprocessing The original implementation of this class has been released as an independent package: https://github.com/alphatwirl/mantichora The current implementation uses mantichora. Parameters ---------- nprocesses : int The number of processes """ @_deprecated_class_method_option('progressbar', msg='use atpbar.disable() instead to turn off progress bars') # after v0.23.3 (2019-03-13) @_deprecated_class_method_option('progressMonitor') def __init__(self, nprocesses=16, progressMonitor=None, progressbar=True): if nprocesses <= 0: raise ValueError("nprocesses must be at least one: {} is given".format(nprocesses)) self.nprocesses = nprocesses self.mcore = None self._repr_pairs = [ ('nprocesses', nprocesses), ] def __repr__(self): return '{}({})'.format( self.__class__.__name__, ', '.join(['{}={!r}'.format(n, v) for n, v in self._repr_pairs]), ) def open(self): if self.mcore is not None: # already open return self.mcore = mantichora.mantichora(nworkers=self.nprocesses) def terminate(self): if self.mcore is None: return self.mcore.terminate() def close(self): if self.mcore is None: return self.mcore.end() self.mcore = None def put(self, package): return self.mcore.run(package) def put_multiple(self, packages): task_idxs = [ ] for p in packages: task_idxs.append(self.mcore.run(p)) return task_idxs def poll(self): return self.mcore.receive_finished() def receive_one(self): return self.mcore.receive_one() def receive(self): return self.mcore.receive_all()
##__________________________________________________________________||