alphatwirl.concurrently package

Submodules

alphatwirl.concurrently.CommunicationChannel module

class alphatwirl.concurrently.CommunicationChannel.CommunicationChannel(dropbox)[source]

Bases: object

A communication channel with workers in other processes.

(This docstring is outdated.)

You can send tasks to workers through this channel. The workers, running in other processes, execute the tasks in the background. You can receive the results of the tasks also through this channel.

An instance of this class can be created with two optional arguments: nprocesses, the number of workers to be created, and progressMonitor:

progressBar = ProgressBar()
progressMonitor = BProgressMonitor(progressBar)
channel = CommunicationChannel(nprocesses=10, progressMonitor=progressMonitor)

Workers will be created when begin() is called:

channel.begin()

In begin(), this class gives each worker a progressReporter, which is created by the progressMonitor.

Now, you are ready to send a task. A task is a function or any object which is callable and picklable. A task can take any number of arguments. If a task takes a named argument progressReporter, the worker will give the progressReporter to the task. A value that a task returns is the result of the task and must be picklable. For example, an instance of EventLoop can be a task. You can send a task with the method put:

channel.put(task1, 10, 20, A=30)

Here, 10, 20, A=30 are the arguments to the task.

This class sends the task to a worker. The worker which receives the task will first try to call the task with the progressReporter in addition to the arguments. If the task doesn’t take the progressReporter, it calls only with the arguments.

You can send multiple tasks:

channel.put(task2)
channel.put(task3, 100, 200)
channel.put(task4, A='abc')
channel.put(task5)

They will be executed by workers.

You can receive the results of the tasks with the method receive():

results = channel.receive()

This method will wait until all tasks are finished. If a task gives a progressReport to the progressReporter, the report will be used, for example, to update progress bars on the screen.

When all tasks end, results will be returned. The return value results is a list of results of the tasks. They are sorted in the order in which the tasks are originally put.

After receiving the results, you can put more tasks:

channel.put(task6)
channel.put(task7)

And you can receive the results of them:

results = channel.receive()

If there are no more tasks to be done, you should call the method end:

channel.end()

This will end all background processes.

begin()[source]
put(task, *args, **kwargs)[source]
receive()[source]
terminate()[source]
end()[source]

alphatwirl.concurrently.CommunicationChannel0 module

class alphatwirl.concurrently.CommunicationChannel0.CommunicationChannel0(progressMonitor=None)[source]

Bases: object

A communication channel for the single process mode

An alternative to CommunicationChannel. However, unlike CommunicationChannel, this class does not send tasks to workers. Instead, it directly executes tasks.

This class has the same interface as the class CommunicationChannel. When this class is used as a substitute for CommunicationChannel, the tasks will be sequentially executed in the foreground.

begin()[source]
put(task, *args, **kwargs)[source]
receive()[source]
terminate()[source]
end()[source]

alphatwirl.concurrently.HTCondorJobSubmitter module

class alphatwirl.concurrently.HTCondorJobSubmitter.HTCondorJobSubmitter(job_desc_extra=[])[source]

Bases: object

run(workingArea, package_index)[source]
poll()[source]

check if the jobs are running and return a list of cluster IDs for finished jobs

wait()[source]

wait until all jobs finish and return a list of cluster IDs

failed_runids(runids)[source]
terminate()[source]
alphatwirl.concurrently.HTCondorJobSubmitter.try_executing_until_succeed(procargs)[source]
alphatwirl.concurrently.HTCondorJobSubmitter.query_status_for(ids)[source]
alphatwirl.concurrently.HTCondorJobSubmitter.change_job_priority(ids, priority=10)[source]
alphatwirl.concurrently.HTCondorJobSubmitter.sample_ids(n=-1)[source]

alphatwirl.concurrently.MultiprocessingDropbox module

class alphatwirl.concurrently.MultiprocessingDropbox.MultiprocessingDropbox(nprocesses=16, progressMonitor=None)[source]

Bases: object

open()[source]
put(package)[source]
receive()[source]
terminate()[source]
close()[source]

alphatwirl.concurrently.SubprocessRunner module

class alphatwirl.concurrently.SubprocessRunner.SubprocessRunner(pipe=False)[source]

Bases: object

run(workingArea, package_index)[source]
poll()[source]

check if the jobs are running and return a list of pids for finished jobs

wait()[source]

wait until all jobs finish and return a list of pids

failed_runids(runids)[source]
terminate()[source]

alphatwirl.concurrently.TaskPackage module

class alphatwirl.concurrently.TaskPackage.TaskPackage(task, args, kwargs)

Bases: tuple

args

Alias for field number 1

kwargs

Alias for field number 2

task

Alias for field number 0

alphatwirl.concurrently.TaskPackageDropbox module

class alphatwirl.concurrently.TaskPackageDropbox.TaskPackageDropbox(workingArea, dispatcher, sleep=5)[source]

Bases: object

A drop box for task packages.

It puts task packages in a working area and dispatches runners that execute the tasks.

open()[source]
put(package)[source]
receive()[source]
terminate()[source]
close()[source]

alphatwirl.concurrently.Worker module

class alphatwirl.concurrently.Worker.Worker(task_queue, result_queue, lock, progressReporter)[source]

Bases: multiprocessing.process.Process

run()[source]

alphatwirl.concurrently.WorkingArea module

class alphatwirl.concurrently.WorkingArea.WorkingArea(dir, python_modules)[source]

Bases: object

Args: dir (str): a path to a directory in which a new directory will be created

open()[source]
put_package(package)[source]
package_path(package_index)[source]
collect_result(package_index)[source]
close()[source]

alphatwirl.concurrently.example_load module

alphatwirl.concurrently.example_load_result module

alphatwirl.concurrently.example_submit_htcondor module

alphatwirl.concurrently.run module

Module contents