Source code for alphatwirl.concurrently.TaskPackageDropbox

# Tai Sakuma <tai.sakuma@cern.ch>
import logging
import time
from operator import itemgetter

from .WorkingArea import WorkingArea

##__________________________________________________________________||
[docs]class TaskPackageDropbox(object): """A drop box for task packages. It puts task packages in a working area and dispatches runners that execute the tasks. """ def __init__(self, workingArea, dispatcher, sleep = 5): self.workingArea = workingArea self.dispatcher = dispatcher self.sleep = sleep def __repr__(self): return '{}(workingArea = {!r}, dispatcher = {!r}, sleep = {!r})'.format( self.__class__.__name__, self.workingArea, self.dispatcher, self.sleep )
[docs] def open(self): self.workingArea.open() self.runid_package_index_map = { }
[docs] def put(self, package): package_index = self.workingArea.put_package(package) runid = self.dispatcher.run(self.workingArea, package_index) self.runid_package_index_map[runid] = package_index
[docs] def receive(self): pkgidx_result_pairs = [ ] # a list of (package_index, _result) try: while self.runid_package_index_map: finished_runid = self.dispatcher.poll() # e.g., [1001, 1003] runid_pkgidx = [(i, self.runid_package_index_map.pop(i)) for i in finished_runid] # e.g., [(1001, 0), (1003, 2)] runid_pkgidx_result = [(ri, pi, self.workingArea.collect_result(pi)) for ri, pi in runid_pkgidx] # e.g., [(1001, 0, result0), (1003, 2, None)] # None indicates the job failed failed = [e for e in runid_pkgidx_result if e[2] is None] # e.g., [(1003, 2, None)] succeeded = [e for e in runid_pkgidx_result if e not in failed] # e.g., [(1001, 0, result0)] # let the dispatcher know the failed runid failed_runid = [e[0] for e in failed] self.dispatcher.failed_runids(failed_runid) # rerun failed jobs for _, pkgidx, _ in failed: logger = logging.getLogger(__name__) logger.warning('resubmitting {}'.format(self.workingArea.package_path(pkgidx))) runid = self.dispatcher.run(self.workingArea, pkgidx) self.runid_package_index_map[runid] = pkgidx pairs = [(pkgidx, result) for runid, pkgidx, result in succeeded] # e.g., [(0, result0)] # only successful ones pkgidx_result_pairs.extend(pairs) time.sleep(self.sleep) except KeyboardInterrupt: logger = logging.getLogger(__name__) logger.warning('received KeyboardInterrupt') self.dispatcher.terminate() # sort in the order of package_index pkgidx_result_pairs = sorted(pkgidx_result_pairs, key = itemgetter(0)) results = [result for i, result in pkgidx_result_pairs] return results
[docs] def close(self): self.dispatcher.terminate() self.workingArea.close()
##__________________________________________________________________||