Source code for alphatwirl.concurrently.MultiprocessingDropbox

# Tai Sakuma <tai.sakuma@gmail.com>
from __future__ import print_function
import logging
import multiprocessing
import threading

from operator import itemgetter
from collections import deque

from ..progressbar import NullProgressMonitor
from .TaskPackage import TaskPackage

from .Worker import Worker

##__________________________________________________________________||
# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
def logger_thread(queue):
    while True:
        record = queue.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)

##__________________________________________________________________||
[docs]class MultiprocessingDropbox(object):
[docs] def __init__(self, nprocesses=16, progressMonitor=None): if nprocesses <= 0: raise ValueError("nprocesses must be at least one: {} is given".format(nprocesses)) self.progressMonitor = NullProgressMonitor() if progressMonitor is None else progressMonitor self.n_max_workers = nprocesses self.workers = [ ] self.task_queue = multiprocessing.JoinableQueue() self.result_queue = multiprocessing.Queue() self.logging_queue = multiprocessing.Queue() self.lock = multiprocessing.Lock() self.n_ongoing_tasks = 0 self.task_idx = -1 # so it starts from 0
def __repr__(self): return '{}(progressMonitor={!r}, n_max_workers={!r}, n_ongoing_tasks={!r}, task_idx={!r})'.format( self.__class__.__name__, self.progressMonitor, self.n_max_workers, self.n_ongoing_tasks, self.task_idx ) def open(self): if len(self.workers) >= self.n_max_workers: # workers already created return # start logging listener self.loggingListener = threading.Thread( target=logger_thread, args=(self.logging_queue,) ) self.loggingListener.start() # start workers for i in range(self.n_max_workers): worker = Worker( task_queue=self.task_queue, result_queue=self.result_queue, logging_queue=self.logging_queue, progressReporter=self.progressMonitor.createReporter(), lock=self.lock ) worker.start() self.workers.append(worker) self.to_return = deque() def put(self, package): self.task_idx += 1 self.task_queue.put((self.task_idx, package)) self.n_ongoing_tasks += 1 return self.task_idx def put_multiple(self, packages): task_idxs = [ ] for package in packages: task_idxs.append(self.put(package)) return task_idxs def poll(self): """Return pairs of task indices and results of finished tasks. """ messages = list(self.to_return) # a list of (task_idx, result) self.to_return.clear() messages.extend(self._receive_finished()) # sort in the order of task_idx messages = sorted(messages, key=itemgetter(0)) return messages def receive_one(self): """Return a pair of a package index and a result. This method waits until a task finishes. This method returns None if no task is running. """ if self.to_return: return self.to_return.popleft() if self.n_ongoing_tasks == 0: return None while not self.to_return: self.to_return.extend(self._receive_finished()) return self.to_return.popleft() def receive(self): """Return pairs of task indices and results. This method waits until all tasks finish. """ messages = list(self.to_return) # a list of (task_idx, result) self.to_return.clear() while self.n_ongoing_tasks >= 1: messages.extend(self._receive_finished()) # sort in the order of task_idx messages = sorted(messages, key=itemgetter(0)) return messages def _receive_finished(self): messages = [ ] # a list of (task_idx, result) while not self.result_queue.empty(): message = self.result_queue.get() messages.append(message) self.n_ongoing_tasks -= 1 return messages def terminate(self): for worker in self.workers: worker.terminate() # wait until all workers are terminated. while any([w.is_alive() for w in self.workers]): pass self.workers = [ ] def close(self): # end workers if self.workers: for i in range(len(self.workers)): self.task_queue.put(None) self.task_queue.join() self.workers = [ ] # end logging listener self.logging_queue.put(None) self.loggingListener.join()
##__________________________________________________________________||