Source code for alphatwirl.concurrently.HTCondorJobSubmitter

#!/usr/bin/env python
# Tai Sakuma <sakuma@cern.ch>
import os, sys
import subprocess
import collections
import time
import textwrap
import getpass
import re
import logging

import alphatwirl
from alphatwirl.misc.deprecation import _deprecated_class_method_option

from .exec_util import try_executing_until_succeed, compose_shortened_command_for_logging

##__________________________________________________________________||
# https://htcondor-wiki.cs.wisc.edu/index.cgi/wiki?p=MagicNumbers
HTCONDOR_JOBSTATUS = {
    0: "Unexpanded",
    1: "Idle",
    2: "Running",
    3: "Removed",
    4: "Completed",
    5: "Held",
    6: "Transferring_Output",
    7: "Suspended"
}

##__________________________________________________________________||
## HTCondor Manual:
## 2.5 Submitting a Job
## http://research.cs.wisc.edu/htcondor/manual/v8.4/2_5Submitting_Job.html
##
## condor_submit command manual
## including complete description of submit description file
## http://research.cs.wisc.edu/htcondor/manual/v8.4/condor_submit.html#man-condor-submit

## keys should be in lower case in this dict
DEFAULT_JOB_DESC_DICT = collections.OrderedDict([
    ('executable', 'run.py'),
    ('output', 'results/$(resultdir)/stdout.$(cluster).$(process).txt'),
    ('error', 'results/$(resultdir)/stderr.$(cluster).$(process).txt'),
    ('log', 'results/$(resultdir)/log.$(cluster).$(process).txt'),
    ('arguments', '$(resultdir).p.gz'),
    ('should_transfer_files', 'YES'),
    ('when_to_transfer_output', 'ON_EXIT'),
    ('transfer_input_files', '$(resultdir).p.gz'),
    ('transfer_output_files', 'results'),
    ('universe', 'vanilla'),
    ('notification', 'Error'),
    ('getenv', 'True'),
])

##__________________________________________________________________||
[docs]class HTCondorJobSubmitter(object): @_deprecated_class_method_option('job_desc_extra', msg='use job_desc_dict instead') def __init__(self, job_desc_extra=[ ], job_desc_dict={}): self.job_desc_dict = DEFAULT_JOB_DESC_DICT.copy() for k, v in job_desc_dict.items(): self.job_desc_dict[k.lower()] = v # not using update() in case # job_desc_dict is ordered self.user_job_desc_dict = job_desc_dict # for test self.job_desc_extra = job_desc_extra # TODO: to be deleted self.clusterprocids_outstanding = [ ] self.clusterprocids_finished = [ ] def run(self, workingArea, package_index): return self.run_multiple(workingArea, [package_index])[0] def run_multiple(self, workingArea, package_indices): if not package_indices: return [ ] cwd = os.getcwd() os.chdir(workingArea.path) package_paths = [workingArea.package_path(i) for i in package_indices] resultdir_basenames = [os.path.splitext(p)[0] for p in package_paths] resultdir_basenames = [os.path.splitext(n)[0] for n in resultdir_basenames] resultdirs = [os.path.join('results', n) for n in resultdir_basenames] for d in resultdirs: alphatwirl.mkdir_p(d) self.job_desc_dict['executable'] = workingArea.executable extra_input_files = sorted(list(workingArea.extra_input_files)) if extra_input_files: self.job_desc_dict['transfer_input_files'] += ', ' + ', '.join(extra_input_files) job_desc = '\n'.join(['{} = {}'.format(k, v) for k, v in self.job_desc_dict.items()]) job_desc_queue_line = 'queue resultdir in {}'.format(', '.join(resultdir_basenames)) # TODO: delete this line as job_desc_extra will be obsolete job_desc = '\n'.join([job_desc] + self.job_desc_extra) job_desc = '\n'.join([job_desc, job_desc_queue_line]) procargs = ['condor_submit'] logger = logging.getLogger(__name__) command_display = compose_shortened_command_for_logging(procargs) logger.debug('execute: {!r}'.format(command_display)) proc = subprocess.Popen( procargs, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = proc.communicate(job_desc) stdout = stdout.decode() stderr = stderr.decode() for l in stdout.rstrip().split('\n'): logger.debug(l) regex = re.compile(r"(\d+) job\(s\) submitted to cluster (\d+)", re.MULTILINE) njobs = int(regex.search(stdout).groups()[0]) clusterid = regex.search(stdout).groups()[1] # e.g., '3158626' change_job_priority([clusterid], 10) ## need to make configurable procid = ['{}'.format(i) for i in range(njobs)] # e.g., ['0', '1', '2', '3'] clusterprocids = ['{}.{}'.format(clusterid, i) for i in procid] # e.g., ['3158626.0', '3158626.1', '3158626.2', '3158626.3'] self.clusterprocids_outstanding.extend(clusterprocids) os.chdir(cwd) return clusterprocids def poll(self): """check if the jobs are running and return a list of cluster IDs for finished jobs """ clusterids = clusterprocids2clusterids(self.clusterprocids_outstanding) clusterprocid_status_list = query_status_for(clusterids) # e.g., [['1730126.0', 2], ['1730127.0', 2], ['1730129.1', 1], ['1730130.0', 1]] if clusterprocid_status_list: clusterprocids, statuses = zip(*clusterprocid_status_list) else: clusterprocids, statuses = (), () clusterprocids_finished = [i for i in self.clusterprocids_outstanding if i not in clusterprocids] self.clusterprocids_finished.extend(clusterprocids_finished) self.clusterprocids_outstanding[:] = clusterprocids # logging counter = collections.Counter(statuses) messages = [ ] if counter: messages.append(', '.join(['{}: {}'.format(HTCONDOR_JOBSTATUS[k], counter[k]) for k in counter.keys()])) if self.clusterprocids_finished: messages.append('Finished {}'.format(len(self.clusterprocids_finished))) logger = logging.getLogger(__name__) logger.info(', '.join(messages)) return clusterprocids_finished def wait(self): """wait until all jobs finish and return a list of cluster IDs """ sleep = 5 while self.clusterprocids_outstanding: self.poll() time.sleep(sleep) return self.clusterprocids_finished def failed_runids(self, runids): # remove failed clusterprocids from self.clusterprocids_finished # so that len(self.clusterprocids_finished)) becomes the number # of the successfully finished jobs for i in runids: try: self.clusterprocids_finished.remove(i) except ValueError: pass def terminate(self): clusterids = clusterprocids2clusterids(self.clusterprocids_outstanding) ids_split = split_ids(clusterids) statuses = [ ] for ids_sub in ids_split: procargs = ['condor_rm'] + ids_sub command_display = compose_shortened_command_for_logging(procargs) logger = logging.getLogger(__name__) logger.debug('execute: {}'.format(command_display)) proc = subprocess.Popen( procargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = proc.communicate()
##__________________________________________________________________|| def clusterprocids2clusterids(clusterprocids): return list(set([i.split('.')[0] for i in clusterprocids])) ##__________________________________________________________________|| def query_status_for(ids, n_at_a_time=500): ids_split = split_ids(ids, n=n_at_a_time) stdout = [ ] for ids_sub in ids_split: procargs = ['condor_q'] + ids_sub + ['-format', '%d.', 'ClusterId', '-format', '%d ', 'ProcId', '-format', '%-2s\n', 'JobStatus'] stdout.extend(try_executing_until_succeed(procargs)) # e.g., stdout = ['688244.0 1 ', '688245.0 1 ', '688246.0 2 '] ret = [l.strip().split() for l in stdout] # e.g., [['688244.0', '1'], ['688245.0', '1'], ['688246.0', '2']] ret = [[e[0], int(e[1])] for e in ret] # a list of [clusterprocid, status] # e.g., [['688244.0', 1], ['688245.0', 1], ['688246.0', 2]] return ret ##__________________________________________________________________|| def change_job_priority(ids, priority=10, n_at_a_time=500): # http://research.cs.wisc.edu/htcondor/manual/v7.8/2_6Managing_Job.html#sec:job-prio ids_split = split_ids(ids, n=n_at_a_time) for ids_sub in ids_split: procargs = ['condor_prio', '-p', str(priority)] + ids_sub try_executing_until_succeed(procargs) ##__________________________________________________________________|| def split_ids(ids, n=500): # e.g., # ids = [3158174', '3158175', '3158176', '3158177', '3158178'] # n = 2 # return [[3158174', '3158175'], ['3158176', '3158177'], ['3158178']] return [ids[i:(i + n)] for i in range(0, len(ids), n)] ##__________________________________________________________________||