Source code for alphatwirl.concurrently.MultiprocessingDropbox

# Tai Sakuma <tai.sakuma@gmail.com>
import multiprocessing
from operator import itemgetter

from ..progressbar import NullProgressMonitor
from .TaskPackage import TaskPackage

from .Worker import Worker

##__________________________________________________________________||
[docs]class MultiprocessingDropbox(object): def __init__(self, nprocesses=16, progressMonitor=None): if nprocesses <= 0: raise ValueError("nprocesses must be at least one: {} is given".format(nprocesses)) self.progressMonitor = NullProgressMonitor() if progressMonitor is None else progressMonitor self.n_max_workers = nprocesses self.n_workers = 0 self.task_queue = multiprocessing.JoinableQueue() self.result_queue = multiprocessing.Queue() self.lock = multiprocessing.Lock() self.n_ongoing_tasks = 0 self.task_idx = -1 # so it starts from 0 def __repr__(self): return '{}(progressMonitor={!r}, n_max_workers={!r}, n_workers={!r}, n_ongoing_tasks={!r}, task_idx={!r})'.format( self.__class__.__name__, self.progressMonitor, self.n_max_workers, self.n_workers, self.n_ongoing_tasks, self.task_idx )
[docs] def open(self): if self.n_workers >= self.n_max_workers: # workers already created return # start workers for i in range(self.n_workers, self.n_max_workers): worker = Worker( task_queue=self.task_queue, result_queue=self.result_queue, progressReporter=self.progressMonitor.createReporter(), lock=self.lock ) worker.start() self.n_workers += 1
[docs] def put(self, package): self.task_idx += 1 self.task_queue.put((self.task_idx, package)) self.n_ongoing_tasks += 1
[docs] def receive(self): messages = [ ] # a list of (task_idx, result) while self.n_ongoing_tasks >= 1: if self.result_queue.empty(): continue message = self.result_queue.get() messages.append(message) self.n_ongoing_tasks -= 1 # sort in the order of task_idx messages = sorted(messages, key=itemgetter(0)) results = [result for task_idx, result in messages] return results
[docs] def terminate(self): pass
[docs] def close(self): for i in range(self.n_workers): self.task_queue.put(None) # end workers self.task_queue.join() self.n_workers = 0
##__________________________________________________________________||