Source code for alphatwirl.concurrently.HTCondorJobSubmitter

#!/usr/bin/env python
# Tai Sakuma <sakuma@cern.ch>
import os, sys
import subprocess
import collections
import time
import textwrap
import getpass
import re
import logging

import alphatwirl

from .exec_util import try_executing_until_succeed, compose_shortened_command_for_logging

##__________________________________________________________________||
# https://htcondor-wiki.cs.wisc.edu/index.cgi/wiki?p=MagicNumbers
HTCONDOR_JOBSTATUS = {
    0: "Unexpanded",
    1: "Idle",
    2: "Running",
    3: "Removed",
    4: "Completed",
    5: "Held",
    6: "Transferring_Output",
    7: "Suspended"
}

##__________________________________________________________________||
[docs]class HTCondorJobSubmitter(object):
[docs] def __init__(self, job_desc_extra=[ ]): self.job_desc_template = """ Executable = run.py output = results/$(resultdir)/stdout.txt error = results/$(resultdir)/stderr.txt log = results/$(resultdir)/log.txt Arguments = $(resultdir).p.gz should_transfer_files = YES when_to_transfer_output = ON_EXIT transfer_input_files = {input_files} transfer_output_files = results Universe = vanilla notification = Error getenv = True queue resultdir in {resultdirs} """ self.job_desc_template = textwrap.dedent(self.job_desc_template).strip() if job_desc_extra: lines = self.job_desc_template.split('\n') lines[-1:-1] = job_desc_extra self.job_desc_template = '\n'.join(lines) self.clusterprocids_outstanding = [ ] self.clusterprocids_finished = [ ]
def run(self, workingArea, package_index): return self.run_multiple(workingArea, [package_index])[0] def run_multiple(self, workingArea, package_indices): if not package_indices: return [ ] cwd = os.getcwd() os.chdir(workingArea.path) package_paths = [workingArea.package_path(i) for i in package_indices] resultdir_basenames = [os.path.splitext(p)[0] for p in package_paths] resultdir_basenames = [os.path.splitext(n)[0] for n in resultdir_basenames] resultdirs = [os.path.join('results', n) for n in resultdir_basenames] for d in resultdirs: alphatwirl.mkdir_p(d) extra_input_files = ['python_modules.tar.gz', 'logging_levels.json.gz'] extra_input_files = [f for f in extra_input_files if os.path.exists(f)] job_desc = self.job_desc_template.format( input_files = ', '.join(['$(resultdir).p.gz'] + extra_input_files), resultdirs = ', '.join(resultdir_basenames) ) procargs = ['condor_submit'] logger = logging.getLogger(__name__) command_display = compose_shortened_command_for_logging(procargs) logger.debug('execute: {!r}'.format(command_display)) proc = subprocess.Popen( procargs, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = proc.communicate(job_desc) stdout = stdout.decode() stderr = stderr.decode() for l in stdout.rstrip().split('\n'): logger.debug(l) regex = re.compile("(\d+) job\(s\) submitted to cluster (\d+)", re.MULTILINE) njobs = int(regex.search(stdout).groups()[0]) clusterid = regex.search(stdout).groups()[1] # e.g., '3158626' change_job_priority([clusterid], 10) ## need to make configurable procid = ['{}'.format(i) for i in range(njobs)] # e.g., ['0', '1', '2', '3'] clusterprocids = ['{}.{}'.format(clusterid, i) for i in procid] # e.g., ['3158626.0', '3158626.1', '3158626.2', '3158626.3'] self.clusterprocids_outstanding.extend(clusterprocids) os.chdir(cwd) return clusterprocids def poll(self): """check if the jobs are running and return a list of cluster IDs for finished jobs """ clusterids = clusterprocids2clusterids(self.clusterprocids_outstanding) clusterprocid_status_list = query_status_for(clusterids) # e.g., [['1730126.0', 2], ['1730127.0', 2], ['1730129.1', 1], ['1730130.0', 1]] if clusterprocid_status_list: clusterprocids, statuses = zip(*clusterprocid_status_list) else: clusterprocids, statuses = (), () clusterprocids_finished = [i for i in self.clusterprocids_outstanding if i not in clusterprocids] self.clusterprocids_finished.extend(clusterprocids_finished) self.clusterprocids_outstanding[:] = clusterprocids # logging counter = collections.Counter(statuses) messages = [ ] if counter: messages.append(', '.join(['{}: {}'.format(HTCONDOR_JOBSTATUS[k], counter[k]) for k in counter.keys()])) if self.clusterprocids_finished: messages.append('Finished {}'.format(len(self.clusterprocids_finished))) logger = logging.getLogger(__name__) logger.info(', '.join(messages)) return clusterprocids_finished def wait(self): """wait until all jobs finish and return a list of cluster IDs """ sleep = 5 while self.clusterprocids_outstanding: self.poll() time.sleep(sleep) return self.clusterprocids_finished def failed_runids(self, runids): # remove failed clusterprocids from self.clusterprocids_finished # so that len(self.clusterprocids_finished)) becomes the number # of the successfully finished jobs for i in runids: try: self.clusterprocids_finished.remove(i) except ValueError: pass def terminate(self): clusterids = clusterprocids2clusterids(self.clusterprocids_outstanding) ids_split = split_ids(clusterids) statuses = [ ] for ids_sub in ids_split: procargs = ['condor_rm'] + ids_sub command_display = compose_shortened_command_for_logging(procargs) logger = logging.getLogger(__name__) logger.debug('execute: {}'.format(command_display)) proc = subprocess.Popen( procargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = proc.communicate()
##__________________________________________________________________|| def clusterprocids2clusterids(clusterprocids): return list(set([i.split('.')[0] for i in clusterprocids])) ##__________________________________________________________________|| def query_status_for(ids, n_at_a_time=500): ids_split = split_ids(ids, n=n_at_a_time) stdout = [ ] for ids_sub in ids_split: procargs = ['condor_q'] + ids_sub + ['-format', '%d.', 'ClusterId', '-format', '%d ', 'ProcId', '-format', '%-2s\n', 'JobStatus'] stdout.extend(try_executing_until_succeed(procargs)) # e.g., stdout = ['688244.0 1 ', '688245.0 1 ', '688246.0 2 '] ret = [l.strip().split() for l in stdout] # e.g., [['688244.0', '1'], ['688245.0', '1'], ['688246.0', '2']] ret = [[e[0], int(e[1])] for e in ret] # a list of [clusterprocid, status] # e.g., [['688244.0', 1], ['688245.0', 1], ['688246.0', 2]] return ret ##__________________________________________________________________|| def change_job_priority(ids, priority=10, n_at_a_time=500): # http://research.cs.wisc.edu/htcondor/manual/v7.8/2_6Managing_Job.html#sec:job-prio ids_split = split_ids(ids, n=n_at_a_time) for ids_sub in ids_split: procargs = ['condor_prio', '-p', str(priority)] + ids_sub try_executing_until_succeed(procargs) ##__________________________________________________________________|| def split_ids(ids, n=500): # e.g., # ids = [3158174', '3158175', '3158176', '3158177', '3158178'] # n = 2 # return [[3158174', '3158175'], ['3158176', '3158177'], ['3158178']] return [ids[i:(i + n)] for i in range(0, len(ids), n)] ##__________________________________________________________________||