cocluto/ClusterController/QstatParser.py

212 lines
9.5 KiB
Python

import StringIO
import re
from JobsState import *
from QueueMachine import *
from Util import *
from Log import *
from Job import *
class QstatParser:
def parseJobState( self, strJobStatus ):
jobState = 0
for i in range(0, len(strJobStatus) ):
c = strJobStatus[i]
if c == 'r':
jobState += JobStateFlags.RUNNING
elif c == 'w':
jobState += JobStateFlags.WAITING
elif c == 'q':
jobState += JobStateFlags.QUEUED
elif c == 't':
jobState += JobStateFlags.TRANSFERING
elif c == 'd':
jobState += JobStateFlags.DELETED
elif c == 'h':
jobState += JobStateFlags.HOLD
elif c == 'E':
jobState += JobStateFlags.ERROR
else:
assert False, 'unhandled job state flag :"' + c + '"'
return jobState
def parseQueueMachineState( self, strQueueMachineStatus ):
queueMachineState = 0
for i in range(0, len(strQueueMachineStatus) ):
c = strQueueMachineStatus[i]
if c == 'd':
queueMachineState += QueueMachineStateFlags.DISABLED
elif c == 'a':
queueMachineState += QueueMachineStateFlags.ALARM
elif c == 'u':
queueMachineState += QueueMachineStateFlags.UNKNOWN
elif c == 'E':
queueMachineState += QueueMachineStateFlags.ERROR
else:
assert False, 'unhandled queue machine state flag :"' + c + '"'
return queueMachineState
def parseQstatOutput( self, qstatOutput ):
jobsState = JobsState()
f = StringIO.StringIO(qstatOutput)
line = f.readline()
currentQueueMachine = None
bInPendingJobsSection = False
# examples of job line :
# 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1
# a typical job line in the pending jobs section looks like this :
# 43645 0.00000 LC_LV_MC aghoufi qw 08/21/2009 08:14:58 1
# a typical running job array line looks like this
# 43619 0.56000 SimpleJobA raffy r 08/20/2009 18:13:03 1 3
# a typical job array line in the pending jobs section looks like this
# 43646 0.00000 SimpleJobA raffy qw 08/21/2009 09:56:40 1 1-4:1
jobRegularExp = re.compile( '^[ ]*(?P<jobId>[^ ]+)[ ]+[0-9.]+[ ]+(?P<jobScriptName>[^ ]+)[ ]+(?P<jobOwner>[^ ]+)[ ]+(?P<jobStatus>[^ ]+)[ ]+(?P<jobStartOrSubmitTime>[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P<numSlots>[0-9]+)[ ]+(?P<jobArrayDetails>[^\n]*)[\s]*$' )
# example of machine line :
# allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86
machineRegularExp = re.compile( '^(?P<queueName>[^@]+)@(?P<machineName>[^.]+)[^ ]+[ ]+(?P<queueTypeString>[^ ]+)[ ]+(?P<numReservedSlots>[^/]+)/(?P<numUsedSlots>[^/]+)/(?P<numTotalSlots>[^ ]+)[ ]+(?P<cpuLoad>[^ ]+)[\s]+(?P<archName>[^ ]+)[\s]+(?P<queueMachineStatus>[^\s]*)' )
pendingJobsHeaderRegularExp = re.compile( '^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*' )
while( len(line) > 0 ):
# print line
# check if the current line is a line describing a job running on a machine
matchObj = jobRegularExp.match( line )
if matchObj:
# we are dealing with a job line
if not bInPendingJobsSection:
assert( currentQueueMachine )
#log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"')
iJobId = int(matchObj.group('jobId'))
jobState = self.parseJobState( matchObj.group('jobStatus') )
strJobArrayDetails = matchObj.group('jobArrayDetails')
bIsJobArray = (len(strJobArrayDetails) != 0)
#logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray)))
# each element of a job array is treated as a separate job for the sake of simplicity.
# For these elements, the job id in sge sense is the same, but they are different in this program's sense
jobElementsIndexRange = range(0,1) # just one element, unless it's a job array
if bIsJobArray:
if bInPendingJobsSection:
jobElementsIndexRange = []
astrRanges = re.split(',', strJobArrayDetails)
for strRange in astrRanges:
singleIndexMatch = re.match('^(?P<elementIndex>[0-9]+)$', strRange)
if singleIndexMatch:
iElementIndex = int(singleIndexMatch.group('elementIndex'))
jobElementsIndexRange.extend(range(iElementIndex, iElementIndex+1))
else:
# we expect strRange to be of the form "1-4:1", where :
# the 1st number is the min element index (sge imposes it to be greater than 0)
# the 2nd number is the max element index
# the 3rd number is the step between consecutive element indices
rangeMatch = re.match( '^(?P<minElementIndex>[0-9]+)-(?P<maxElementIndex>[0-9]+):(?P<stepBetweenIndices>[0-9]+)$', strRange)
if rangeMatch == None:
logError('unexpected format for job array details : "%s" (line="%s"' % (strRange, line) )
assert(False)
iMinElementIndex=int(rangeMatch.group('minElementIndex'))
iMaxElementIndex=int(rangeMatch.group('maxElementIndex'))
iStepBetweenIndices=int(rangeMatch.group('stepBetweenIndices'))
jobElementsIndexRange.extend(range(iMinElementIndex, iMaxElementIndex+1, iStepBetweenIndices))
else:
# we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element
iJobArrayElementIndex = int(strJobArrayDetails)
assert(iJobArrayElementIndex != 0) # sge does not allow element indices to be 0
jobElementsIndexRange = range(iJobArrayElementIndex,iJobArrayElementIndex+1)
for iElementIndex in jobElementsIndexRange:
jobId = None
if bIsJobArray:
jobId = JobId(iJobId, iElementIndex)
else:
jobId = JobId(iJobId)
job = jobsState.getJob(jobId)
#logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr()))
if job == None:
# this job hasn't been encountered yet in the output of qstat ...
# we could either be in the pending jobs section or in the running jobs section
job = Job(jobId)
jobsState.addJob( job )
job.setState( jobState )
strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime')
jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S')
if bInPendingJobsSection:
job.setSubmitTime( jobStartOrSubmitTime )
else:
job.setStartTime( jobStartOrSubmitTime )
job.setOwner( matchObj.group('jobOwner') )
job.setScriptName( matchObj.group('jobScriptName') )
if bInPendingJobsSection:
job.setNumRequiredSlots(int(matchObj.group('numSlots')))
else:
assert( not bInPendingJobsSection ) # if we are in the pending jobs section, the job should be new
if not bInPendingJobsSection:
job.addSlots( currentQueueMachine.getName(), int(matchObj.group('numSlots')) )
else:
# the current line does not describe a job
if not bInPendingJobsSection:
# check if this line describes the status of a machine
matchObj = machineRegularExp.match( line )
if matchObj:
queueName = matchObj.group('queueName')
machineName = matchObj.group('machineName')
queueMachine = QueueMachine( queueName, machineName )
#log(line)
#log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString'))
#log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots'))
queueMachine.setNumSlots( int( matchObj.group('numTotalSlots') ) )
queueMachine.setNumUsedSlots( int( matchObj.group('numUsedSlots') ) )
strCpuLoad = matchObj.group('cpuLoad')
if strCpuLoad != '-NA-':
queueMachine.setCpuLoad( float(strCpuLoad) )
strQueueMachineState = matchObj.group('queueMachineStatus')
queueMachine.setState( self.parseQueueMachineState( strQueueMachineState ) )
#log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"')
#log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"')
currentQueueMachine = queueMachine
jobsState.addQueueMachine( queueMachine )
else:
matchObj = pendingJobsHeaderRegularExp.match( line )
if matchObj:
bInPendingJobsSection = True
currentQueueMachine = None
else:
#print line
None
else:
# we are in a pending jobs section
matchObj = re.match('^[#]+$', line)
if not matchObj:
# unexpected line
print 'line = "' + line + '"'
assert( False )
None
line = f.readline()
f.close()
return jobsState
def parseJobDetails( self, qstatOutput, job ):
"""
adds to job the details parsed from the output of the "qstat -j <jobid>" command
"""
f = StringIO.StringIO(qstatOutput)
line = f.readline()
fieldRegularExp = re.compile( '^(?P<fieldName>[^:]+):[ ]+(?P<fieldValue>[?]*)$' )
while( len(line) > 0 ):
# print line
# check if the current line is a line describing a job running on a machine
matchObj = fieldRegularExp.match( line )
if matchObj:
fieldName = matchObj.group('fieldName')
strFieldValue = matchObj.group('fieldValue')
if fieldName == 'job_number':
assert( job.getId().asStr() == strFieldValue )
elif fieldName == 'hard_queue_list':
allowedQueues = strFieldValue.split(',')
assert(len(allowedQueues) > 0)
job.m_jobRequirements.m_queues = allowedQueues
elif fieldName == 'parallel environment':
# the value could be 'ompi range: 32'
matchObj = re.match('ompi range: (?P<numSlots>[0-9]+)[?]*', strFieldValue)
if matchObj:
job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI
else:
assert( False )
else:
# ignore he other fields
None
line = f.readline()
f.close()