Source code for alphatwirl.concurrently.CommunicationChannel0

# Tai Sakuma <tai.sakuma@gmail.com>

##__________________________________________________________________||
[docs]class CommunicationChannel0(object): """A communication channel for the single process mode An alternative to `CommunicationChannel`. However, unlike `CommunicationChannel`, this class does not send tasks to workers. Instead, it directly executes tasks. This class has the same interface as the class `CommunicationChannel`. When this class is used as a substitute for `CommunicationChannel`, the tasks will be sequentially executed in the foreground. """
[docs] def __init__(self): self.taskidx = -1 # so it starts from 0 self.taskidx_result_pairs = [ ]
def __repr__(self): return '{}()'.format( self.__class__.__name__, ) def begin(self): pass def put(self, task, *args, **kwargs): self.taskidx += 1 result = task(*args, **kwargs) self.taskidx_result_pairs.append((self.taskidx, result)) return self.taskidx def put_multiple(self, task_args_kwargs_list): task_idxs = [ ] for t in task_args_kwargs_list: try: task = t['task'] args = t.get('args', ()) kwargs = t.get('kwargs', {}) task_idx = self.put(task, *args, **kwargs) except TypeError: task_idx = self.put(t) task_idxs.append(task_idx) return task_idxs def receive_finished(self): return self.receive_all() def receive_one(self): if self.taskidx_result_pairs: return self.taskidx_result_pairs.pop(0) return None def receive_all(self): ret = self.taskidx_result_pairs[:] del self.taskidx_result_pairs[:] return ret def receive(self): ret = [r for _, r in self.receive_all()] return ret def terminate(self): pass def end(self): pass
##__________________________________________________________________||