258 lines
14 KiB
Python
258 lines
14 KiB
Python
import io
|
|
import re
|
|
from .JobsState import JobsState
|
|
from .QueueMachine import QueueMachine, QueueMachineStateFlags
|
|
from .Util import *
|
|
from .Log import logError
|
|
from .Job import JobStateFlags, JobId, Job, ParallelEnvironment
|
|
import logging
|
|
import time
|
|
|
|
|
|
class QstatParser:
|
|
def parseJobState(self, strJobStatus):
|
|
jobState = 0
|
|
for i in range(0, len(strJobStatus)):
|
|
c = strJobStatus[i]
|
|
if c == 'r':
|
|
jobState += JobStateFlags.RUNNING
|
|
elif c == 'w':
|
|
jobState += JobStateFlags.WAITING
|
|
elif c == 'q':
|
|
jobState += JobStateFlags.QUEUED
|
|
elif c == 't':
|
|
jobState += JobStateFlags.TRANSFERING
|
|
elif c == 'd':
|
|
jobState += JobStateFlags.DELETED
|
|
elif c == 'h':
|
|
jobState += JobStateFlags.HOLD
|
|
elif c == 's':
|
|
jobState += JobStateFlags.SUSPENDED
|
|
elif c == 'E':
|
|
jobState += JobStateFlags.ERROR
|
|
else:
|
|
assert False, 'unhandled job state flag :"' + c + '"'
|
|
return jobState
|
|
|
|
def parseQueueMachineState(self, strQueueMachineStatus):
|
|
queueMachineState = 0
|
|
for i in range(0, len(strQueueMachineStatus)):
|
|
c = strQueueMachineStatus[i]
|
|
if c == 'd':
|
|
queueMachineState += QueueMachineStateFlags.DISABLED
|
|
elif c == 'a':
|
|
queueMachineState += QueueMachineStateFlags.ALARM
|
|
elif c == 'u':
|
|
queueMachineState += QueueMachineStateFlags.UNKNOWN
|
|
elif c == 'E':
|
|
queueMachineState += QueueMachineStateFlags.ERROR
|
|
elif c == 'o':
|
|
queueMachineState += QueueMachineStateFlags.OBSOLETE
|
|
elif c == 's':
|
|
queueMachineState += QueueMachineStateFlags.SUSPENDED
|
|
else:
|
|
assert False, 'unhandled queue machine state flag :"' + c + '"'
|
|
return queueMachineState
|
|
|
|
def parseQstatOutput(self, qstatOutput, cluster_domain: str = 'ipr.univ-rennes1.fr'):
|
|
"""
|
|
parses result of command 'qstat -f -u \\* -pri'
|
|
|
|
cluster_domain: network domain of the cluster (eg 'ipr.univ-rennes.fr'). This information is missing from qstat's output and is used to form the fully qualified domain name of the cluster machines.
|
|
"""
|
|
logging.debug('qstatOutput type : %s' % type(qstatOutput))
|
|
|
|
def parse_pending_tasks(task_ranges_sequence):
|
|
"""
|
|
parses a job's task ids encoded in the form of a string containing a sequence of ranges
|
|
|
|
:param str task_ranges_sequence: a job's task ids encoded in the form of a string containing a sequence of non overlapping ranges separated with a comma. Each range is expected to be in the form "<min_index>-<max_index>:<step>"
|
|
:return list(int): the list of task ids
|
|
|
|
for example, this function would return [1, 2, 3, 4, 6, 7, 8] for the input string "1-4:1,6-8:1"
|
|
"""
|
|
task_ids = []
|
|
ranges = re.split(',', task_ranges_sequence)
|
|
for task_range in ranges:
|
|
single_index_match = re.match('^(?P<elementIndex>[0-9]+)$', task_range)
|
|
if single_index_match:
|
|
element_index = int(single_index_match.group('elementIndex'))
|
|
task_ids.extend(range(element_index, element_index + 1))
|
|
else:
|
|
# we expect strRange to be of the form "1-4:1", where :
|
|
# the 1st number is the min element index (sge imposes it to be greater than 0)
|
|
# the 2nd number is the max element index
|
|
# the 3rd number is the step between consecutive element indices
|
|
range_match = re.match('^(?P<minElementIndex>[0-9]+)-(?P<maxElementIndex>[0-9]+):(?P<stepBetweenIndices>[0-9]+)$', task_range)
|
|
if range_match is None:
|
|
logError('unexpected format for job array details : "%s" (line="%s"' % (task_range, line))
|
|
assert False
|
|
min_element_index = int(range_match.group('minElementIndex'))
|
|
min_element_index = int(range_match.group('maxElementIndex'))
|
|
step_between_indices = int(range_match.group('stepBetweenIndices'))
|
|
task_ids.extend(range(min_element_index, min_element_index + 1, step_between_indices))
|
|
return task_ids
|
|
|
|
# ugly hack to work around the fact that qstat truncates the fqdn of cluster nodes
|
|
# graffy@physix-master:~$ qstat -f -u \*
|
|
# queuename qtype resv/used/tot. load_avg arch states
|
|
# ---------------------------------------------------------------------------------
|
|
# main.q@physix88.ipr.univ-renne BIP 0/0/36 14.03 lx-amd64
|
|
# TODO: fix this properly by parsing the output of 'qstat -f -u \* -xml' instead of 'qstat -f -u \*'
|
|
qstatOutput = re.sub(r'\.ipr\.univ[^ ]*', f'.{cluster_domain}', qstatOutput)
|
|
|
|
jobsState = JobsState()
|
|
f = io.StringIO(qstatOutput)
|
|
line = f.readline()
|
|
currentQueueMachine = None
|
|
bInPendingJobsSection = False
|
|
# examples of job line :
|
|
# 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1
|
|
# a typical job line in the pending jobs section looks like this :
|
|
# 43645 0.00000 LC_LV_MC aghoufi qw 08/21/2009 08:14:58 1
|
|
# a typical running job array line looks like this
|
|
# 43619 0.56000 SimpleJobA raffy r 08/20/2009 18:13:03 1 3
|
|
# a typical job array line in the pending jobs section looks like this
|
|
# 43646 0.00000 SimpleJobA raffy qw 08/21/2009 09:56:40 1 1-4:1
|
|
|
|
# nurg The job's total urgency value in normalized fashion.
|
|
# npprior The job's -p priority in normalized fashion.
|
|
# ntckts The job's ticket amount in normalized fashion.
|
|
# ppri The job's -p priority as specified by the user.
|
|
|
|
jobRegularExp = re.compile(r'^[ ]*(?P<jobId>[^ ]+)[ ]+(?P<JobPriority>[0-9.]+)[ ]+(?P<nurg>[0-9.]+)[ ]+(?P<npprior>[0-9.]+)[ ]+(?P<ntckts>[0-9.]+)[ ]+(?P<ppri>-?[0-9]+)[ ]+(?P<jobScriptName>[^ ]+)[ ]+(?P<jobOwner>[^ ]+)[ ]+(?P<jobStatus>[^ ]+)[ ]+(?P<jobStartOrSubmitTime>[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P<numSlots>[0-9]+)[ ]+(?P<jobArrayDetails>[^\n]*)[\s]*$')
|
|
# example of machine line :
|
|
# allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86
|
|
machineRegularExp = re.compile(r'^(?P<queueName>[^@]+)@(?P<machineName>[^ ]+)[ ]+(?P<queueTypeString>[^ ]+)[ ]+(?P<numReservedSlots>[^/]+)/(?P<numUsedSlots>[^/]+)/(?P<numTotalSlots>[^ ]+)[ ]+(?P<cpuLoad>[^ ]+)[\s]+(?P<archName>[^ ]+)[\s]+(?P<queueMachineStatus>[^\s]*)')
|
|
pendingJobsHeaderRegularExp = re.compile('^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*')
|
|
while len(line) > 0:
|
|
# print line
|
|
# check if the current line is a line describing a job running on a machine
|
|
matchObj = jobRegularExp.match(line)
|
|
if matchObj:
|
|
# we are dealing with a job line
|
|
if not bInPendingJobsSection:
|
|
assert currentQueueMachine
|
|
# log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"')
|
|
iJobId = int(matchObj.group('jobId'))
|
|
logging.debug('iJobId = %d' % iJobId)
|
|
jobState = self.parseJobState(matchObj.group('jobStatus'))
|
|
strJobArrayDetails = matchObj.group('jobArrayDetails')
|
|
bIsJobArray = (len(strJobArrayDetails) != 0)
|
|
# logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray)))
|
|
# each element of a job array is treated as a separate job for the sake of simplicity.
|
|
# For these elements, the job id in sge sense is the same, but they are different in this program's sense
|
|
task_ids = range(0, 1) # just one element, unless it's a job array
|
|
if bIsJobArray:
|
|
if bInPendingJobsSection:
|
|
task_ids = parse_pending_tasks(strJobArrayDetails)
|
|
else:
|
|
# we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element
|
|
iJobArrayElementIndex = int(strJobArrayDetails)
|
|
assert iJobArrayElementIndex != 0 # sge does not allow element indices to be 0
|
|
task_ids = range(iJobArrayElementIndex, iJobArrayElementIndex + 1)
|
|
logging.debug('task_ids = %s' % task_ids)
|
|
for task_id in task_ids:
|
|
logging.debug('task_id = %s' % task_id)
|
|
jobId = None
|
|
if bIsJobArray:
|
|
jobId = JobId(iJobId, task_id)
|
|
else:
|
|
jobId = JobId(iJobId)
|
|
job = jobsState.getJob(jobId)
|
|
# logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr()))
|
|
if job is None:
|
|
# this job hasn't been encountered yet in the output of qstat ...
|
|
# we could either be in the pending jobs section or in the running jobs section
|
|
job = Job(jobId)
|
|
jobsState.addJob(job)
|
|
job.setState(jobState)
|
|
strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime')
|
|
jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S')
|
|
if bInPendingJobsSection:
|
|
job.setSubmitTime(jobStartOrSubmitTime)
|
|
else:
|
|
job.setStartTime(jobStartOrSubmitTime)
|
|
job.setOwner(matchObj.group('jobOwner'))
|
|
job.setScriptName(matchObj.group('jobScriptName'))
|
|
if bInPendingJobsSection:
|
|
job.setNumRequiredSlots(int(matchObj.group('numSlots')))
|
|
else:
|
|
assert not bInPendingJobsSection # if we are in the pending jobs section, the job should be new
|
|
if not bInPendingJobsSection:
|
|
job.addSlots(currentQueueMachine.getName(), int(matchObj.group('numSlots')))
|
|
else:
|
|
# the current line does not describe a job
|
|
if not bInPendingJobsSection:
|
|
# check if this line describes the status of a machine
|
|
matchObj = machineRegularExp.match(line)
|
|
if matchObj:
|
|
queueName = matchObj.group('queueName')
|
|
machineName = matchObj.group('machineName')
|
|
queueMachine = QueueMachine(queueName, machineName)
|
|
# log(line)
|
|
# log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString'))
|
|
# log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots'))
|
|
queueMachine.setNumSlots(int(matchObj.group('numTotalSlots')))
|
|
queueMachine.setNumUsedSlots(int(matchObj.group('numUsedSlots')))
|
|
strCpuLoad = matchObj.group('cpuLoad')
|
|
if strCpuLoad != '-NA-':
|
|
queueMachine.setCpuLoad(float(strCpuLoad))
|
|
|
|
strQueueMachineState = matchObj.group('queueMachineStatus')
|
|
queueMachine.setState(self.parseQueueMachineState(strQueueMachineState))
|
|
# log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"')
|
|
# log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"')
|
|
currentQueueMachine = queueMachine
|
|
jobsState.addQueueMachine(queueMachine)
|
|
else:
|
|
matchObj = pendingJobsHeaderRegularExp.match(line)
|
|
if matchObj:
|
|
bInPendingJobsSection = True
|
|
currentQueueMachine = None
|
|
else:
|
|
pass
|
|
else:
|
|
# we are in a pending jobs section
|
|
matchObj = re.match('^[#]+$', line)
|
|
if not matchObj:
|
|
# unexpected line
|
|
print('line = "' + line + '"')
|
|
assert False
|
|
line = f.readline()
|
|
f.close()
|
|
return jobsState
|
|
|
|
def parseJobDetails(self, qstatOutput, job):
|
|
"""
|
|
adds to job the details parsed from the output of the "qstat -j <jobid>" command
|
|
"""
|
|
f = io.StringIO(qstatOutput)
|
|
line = f.readline()
|
|
fieldRegularExp = re.compile('^(?P<fieldName>[^:]+):[ ]+(?P<fieldValue>[?]*)$')
|
|
while len(line) > 0:
|
|
# print line
|
|
# check if the current line is a line describing a job running on a machine
|
|
matchObj = fieldRegularExp.match(line)
|
|
if matchObj:
|
|
fieldName = matchObj.group('fieldName')
|
|
strFieldValue = matchObj.group('fieldValue')
|
|
if fieldName == 'job_number':
|
|
assert job.getId().asStr() == strFieldValue
|
|
elif fieldName == 'hard_queue_list':
|
|
allowedQueues = strFieldValue.split(',')
|
|
assert len(allowedQueues) > 0
|
|
job.m_jobRequirements.m_queues = allowedQueues
|
|
elif fieldName == 'parallel environment':
|
|
# the value could be 'ompi range: 32'
|
|
matchObj = re.match('ompi range: (?P<numSlots>[0-9]+)[?]*', strFieldValue)
|
|
if matchObj:
|
|
job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI
|
|
else:
|
|
assert False
|
|
else:
|
|
# ignore he other fields
|
|
None
|
|
line = f.readline()
|
|
f.close()
|