# Tai Sakuma <tai.sakuma@gmail.com>
import logging
##__________________________________________________________________||
class TaskPackage:
"""A task package
Note: This class will be replaced with `functools.partial`
"""
def __init__(self, task, args, kwargs):
self.task = task
self.args = args
self.kwargs = kwargs
def __call__(self):
return self.task(*self.args, **self.kwargs)
##__________________________________________________________________||
[docs]class CommunicationChannel:
"""A communication channel with workers in other processes.
(This docstring is outdated.)
You can send tasks to workers through this channel. The workers,
running in other processes, execute the tasks in the background.
You can receive the results of the tasks also through this
channel.
An instance of this class can be created with two optional
arguments: ``nprocesses``, the number of workers to be created,
and ``progressMonitor``::
progressBar = ProgressBar()
progressMonitor = BProgressMonitor(progressBar)
channel = CommunicationChannel(nprocesses=10, progressMonitor=progressMonitor)
Workers will be created when ``begin()`` is called::
channel.begin()
In ``begin()``, this class gives each worker a
``progressReporter``, which is created by the ``progressMonitor``.
Now, you are ready to send a task. A task is a function or any
object which is callable and picklable. A task can take any number
of arguments. If a task takes a named argument
``progressReporter``, the worker will give the
``progressReporter`` to the task. A value that a task returns is
the result of the task and must be picklable. For example, an
instance of ``EventLoop`` can be a task. You can send a task with
the method ``put``::
channel.put(task1, 10, 20, A=30)
Here, 10, 20, A=30 are the arguments to the task.
This class sends the task to a worker. The worker which receives
the task will first try to call the task with the
``progressReporter`` in addition to the arguments. If the task
doesn't take the ``progressReporter``, it calls only with the
arguments.
You can send multiple tasks::
channel.put(task2)
channel.put(task3, 100, 200)
channel.put(task4, A='abc')
channel.put(task5)
They will be executed by workers.
You can receive the results of the tasks with the method
``receive()``::
results = channel.receive()
This method will wait until all tasks are finished. If a task
gives a ``progressReport`` to the ``progressReporter``, the report
will be used, for example, to update progress bars on the screen.
When all tasks end, results will be returned. The return value
``results`` is a list of results of the tasks. They are sorted in
the order in which the tasks are originally put.
After receiving the results, you can put more tasks::
channel.put(task6)
channel.put(task7)
And you can receive the results of them::
results = channel.receive()
If there are no more tasks to be done, you should call the method
``end``::
channel.end()
This will end all background processes.
"""
[docs] def __init__(self, dropbox):
self.dropbox = dropbox
self.isopen = False
def __repr__(self):
return '{}(dropbox={!r}, isopen={!r})'.format(
self.__class__.__name__,
self.dropbox,
self.isopen
)
def begin(self):
"""begin
"""
if self.isopen: return
self.dropbox.open()
self.isopen = True
def put(self, task, *args, **kwargs):
"""put a task and its arguments
If you need to put multiple tasks, it can be faster to put
multiple tasks with `put_multiple()` than to use this method
multiple times.
Parameters
----------
task : a function
A function to be executed
args : list
A list of positional arguments to the `task`
kwargs : dict
A dict with keyword arguments to the `task`
Returns
-------
int, str, or any hashable and sortable
A task ID. IDs are sortable in the order in which the
corresponding tasks are put.
"""
if not self.isopen:
logger = logging.getLogger(__name__)
logger.warning('the drop box is not open')
return
package = TaskPackage(task=task, args=args, kwargs=kwargs)
return self.dropbox.put(package)
def put_multiple(self, task_args_kwargs_list):
"""put a list of tasks and their arguments
This method can be used to put multiple tasks at once. Calling
this method once with multiple tasks can be much faster than
calling `put()` multiple times.
Parameters
----------
task_args_kwargs_list : list
A list of lists with three items that can be parameters of
`put()`, i.e., `task`, `args`, `kwargs`.
Returns
-------
list
A list of task IDs.
"""
if not self.isopen:
logger = logging.getLogger(__name__)
logger.warning('the drop box is not open')
return
packages = [ ]
for t in task_args_kwargs_list:
try:
task = t['task']
args = t.get('args', ())
kwargs = t.get('kwargs', {})
package = TaskPackage(task=task, args=args, kwargs=kwargs)
except TypeError:
package = TaskPackage(task=t, args=(), kwargs={})
packages.append(package)
return self.dropbox.put_multiple(packages)
def receive_finished(self):
"""return a list of pairs of IDs and results of finished tasks.
This method doesn't wait for tasks to finish. It returns IDs
and results which have already finished.
Returns
-------
list
A list of pairs of IDs and results
"""
if not self.isopen:
logger = logging.getLogger(__name__)
logger.warning('the drop box is not open')
return
return self.dropbox.poll()
def receive_one(self):
"""return a pair of an ID and a result of a task.
This method waits for a task to finish.
Returns
-------
An ID and a result of a task. `None` if no task is running.
"""
if not self.isopen:
logger = logging.getLogger(__name__)
logger.warning('the drop box is not open')
return
return self.dropbox.receive_one()
def receive_all(self):
"""return a list of pairs of IDs and results of all tasks.
This method waits for all tasks to finish.
Returns
-------
list
A list of pairs of IDs and results
"""
if not self.isopen:
logger = logging.getLogger(__name__)
logger.warning('the drop box is not open')
return
return self.dropbox.receive()
def receive(self):
"""return a list results of all tasks.
This method waits for all tasks to finish.
Returns
-------
list
A list of results of the tasks. The results are sorted in
the order in which the tasks are put.
"""
pkgidx_result_pairs = self.receive_all()
if pkgidx_result_pairs is None:
return
results = [r for _, r in pkgidx_result_pairs]
return results
def terminate(self):
"""terminate
"""
self.dropbox.terminate()
def end(self):
"""end
"""
if not self.isopen: return
self.dropbox.close()
self.isopen = False
##__________________________________________________________________||