185 lines
8.4 KiB
Python
185 lines
8.4 KiB
Python
import StringIO
|
|
import re
|
|
from JobsState import *
|
|
from QueueMachine import *
|
|
from Util import *
|
|
from Log import *
|
|
from Job import *
|
|
|
|
class QstatParser:
|
|
def parseJobState( self, strJobStatus ):
|
|
jobState = 0
|
|
for i in range(0, len(strJobStatus) ):
|
|
c = strJobStatus[i]
|
|
if c == 'r':
|
|
jobState += JobStateFlags.RUNNING
|
|
elif c == 'w':
|
|
jobState += JobStateFlags.WAITING
|
|
elif c == 'q':
|
|
jobState += JobStateFlags.QUEUED
|
|
elif c == 't':
|
|
jobState += JobStateFlags.TRANSFERING
|
|
else:
|
|
assert False, 'unhandled job state flag :"' + c + '"'
|
|
return jobState
|
|
def parseQstatOutput( self, qstatOutput ):
|
|
jobsState = JobsState()
|
|
f = StringIO.StringIO(qstatOutput)
|
|
line = f.readline()
|
|
currentQueueMachine = None
|
|
bInPendingJobsSection = False
|
|
# examples of job line :
|
|
# 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1
|
|
# a typical job line in the pending jobs section looks like this :
|
|
# 43645 0.00000 LC_LV_MC aghoufi qw 08/21/2009 08:14:58 1
|
|
# a typical running job array line looks like this
|
|
# 43619 0.56000 SimpleJobA raffy r 08/20/2009 18:13:03 1 3
|
|
# a typical job array line in the pending jobs section looks like this
|
|
# 43646 0.00000 SimpleJobA raffy qw 08/21/2009 09:56:40 1 1-4:1
|
|
jobRegularExp = re.compile( '^[ ]*(?P<jobId>[^ ]+)[ ]+[0-9.]+[ ]+(?P<jobScriptName>[^ ]+)[ ]+(?P<jobOwner>[^ ]+)[ ]+(?P<jobStatus>[^ ]+)[ ]+(?P<jobStartOrSubmitTime>[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P<numSlots>[0-9]+)[ ]+(?P<jobArrayDetails>[^\n]*)[\s]*$' )
|
|
# example of machine line :
|
|
# allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86
|
|
machineRegularExp = re.compile( '^(?P<queueName>[^@]+)@(?P<machineName>[^.]+)[^ ]+[ ]+(?P<queueTypeString>[^ ]+)[ ]+(?P<numReservedSlots>[^/]+)/(?P<numUsedSlots>[^/]+)/(?P<numTotalSlots>[^ ]+)[?]*' )
|
|
pendingJobsHeaderRegularExp = re.compile( '^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*' )
|
|
while( len(line) > 0 ):
|
|
# print line
|
|
# check if the current line is a line describing a job running on a machine
|
|
matchObj = jobRegularExp.match( line )
|
|
if matchObj:
|
|
# we are dealing with a job line
|
|
if not bInPendingJobsSection:
|
|
assert( currentQueueMachine )
|
|
#log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"')
|
|
iJobId = int(matchObj.group('jobId'))
|
|
jobState = self.parseJobState( matchObj.group('jobStatus') )
|
|
strJobArrayDetails = matchObj.group('jobArrayDetails')
|
|
bIsJobArray = (len(strJobArrayDetails) != 0)
|
|
#logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray)))
|
|
# each element of a job array is treated as a separate job for the sake of simplicity.
|
|
# For these elements, the job id in sge sense is the same, but they are different in this program's sense
|
|
jobElementsIndexRange = range(0,1) # just one element, unless it's a job array
|
|
if bIsJobArray:
|
|
if bInPendingJobsSection:
|
|
jobElementsIndexRange = []
|
|
astrRanges = re.split(',', strJobArrayDetails)
|
|
for strRange in astrRanges:
|
|
singleIndexMatch = re.match('^(?P<elementIndex>[0-9]+)$', strRange)
|
|
if singleIndexMatch:
|
|
iElementIndex = int(singleIndexMatch.group('elementIndex'))
|
|
jobElementsIndexRange.extend(range(iElementIndex, iElementIndex+1))
|
|
else:
|
|
# we expect strRange to be of the form "1-4:1", where :
|
|
# the 1st number is the min element index (sge imposes it to be greater than 0)
|
|
# the 2nd number is the max element index
|
|
# the 3rd number is the step between consecutive element indices
|
|
rangeMatch = re.match( '^(?P<minElementIndex>[0-9]+)-(?P<maxElementIndex>[0-9]+):(?P<stepBetweenIndices>[0-9]+)$', strRange)
|
|
if rangeMatch == None:
|
|
logError('unexpected format for job array details : "%s" (line="%s"' % (strRange, line) )
|
|
assert(False)
|
|
iMinElementIndex=int(rangeMatch.group('minElementIndex'))
|
|
iMaxElementIndex=int(rangeMatch.group('maxElementIndex'))
|
|
iStepBetweenIndices=int(rangeMatch.group('stepBetweenIndices'))
|
|
jobElementsIndexRange.extend(range(iMinElementIndex, iMaxElementIndex+1, iStepBetweenIndices))
|
|
else:
|
|
# we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element
|
|
iJobArrayElementIndex = int(strJobArrayDetails)
|
|
assert(iJobArrayElementIndex != 0) # sge does not allow element indices to be 0
|
|
jobElementsIndexRange = range(iJobArrayElementIndex,iJobArrayElementIndex+1)
|
|
for iElementIndex in jobElementsIndexRange:
|
|
jobId = None
|
|
if bIsJobArray:
|
|
jobId = JobId(iJobId, iElementIndex)
|
|
else:
|
|
jobId = JobId(iJobId)
|
|
job = jobsState.getJob(jobId)
|
|
#logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr()))
|
|
if job == None:
|
|
# this job hasn't been encountered yet in the output of qstat ...
|
|
# we could either be in the pending jobs section or in the running jobs section
|
|
job = Job(jobId)
|
|
jobsState.addJob( job )
|
|
job.setState( jobState )
|
|
strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime')
|
|
jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S')
|
|
if bInPendingJobsSection:
|
|
job.setSubmitTime( jobStartOrSubmitTime )
|
|
else:
|
|
job.setStartTime( jobStartOrSubmitTime )
|
|
job.setOwner( matchObj.group('jobOwner') )
|
|
job.setScriptName( matchObj.group('jobScriptName') )
|
|
if bInPendingJobsSection:
|
|
job.setNumRequiredSlots(int(matchObj.group('numSlots')))
|
|
else:
|
|
assert( not bInPendingJobsSection ) # if we are in the pending jobs section, the job should be new
|
|
if not bInPendingJobsSection:
|
|
job.addSlots( currentQueueMachine.getMachineName(), int(matchObj.group('numSlots')) )
|
|
else:
|
|
# the current line does not describe a job
|
|
if not bInPendingJobsSection:
|
|
# check if this line describes the status of a machine
|
|
matchObj = machineRegularExp.match( line )
|
|
if matchObj:
|
|
queueName = matchObj.group('queueName')
|
|
machineName = matchObj.group('machineName')
|
|
queueMachine = QueueMachine( queueName, machineName )
|
|
#log(line)
|
|
#log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString'))
|
|
#log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots'))
|
|
queueMachine.setNumSlots( int( matchObj.group('numTotalSlots') ) )
|
|
|
|
#log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"')
|
|
#log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"')
|
|
currentQueueMachine = queueMachine
|
|
jobsState.addQueueMachine( queueMachine )
|
|
else:
|
|
matchObj = pendingJobsHeaderRegularExp.match( line )
|
|
if matchObj:
|
|
bInPendingJobsSection = True
|
|
currentQueueMachine = None
|
|
else:
|
|
#print line
|
|
None
|
|
else:
|
|
# we are in a pending jobs section
|
|
matchObj = re.match('^[#]+$', line)
|
|
if not matchObj:
|
|
# unexpected line
|
|
print 'line = "' + line + '"'
|
|
assert( False )
|
|
None
|
|
line = f.readline()
|
|
f.close()
|
|
return jobsState
|
|
def parseJobDetails( self, qstatOutput, job ):
|
|
"""
|
|
adds to job the details parsed from the output of the "qstat -j <jobid>" command
|
|
"""
|
|
f = StringIO.StringIO(qstatOutput)
|
|
line = f.readline()
|
|
fieldRegularExp = re.compile( '^(?P<fieldName>[^:]+):[ ]+(?P<fieldValue>[?]*)$' )
|
|
while( len(line) > 0 ):
|
|
# print line
|
|
# check if the current line is a line describing a job running on a machine
|
|
matchObj = fieldRegularExp.match( line )
|
|
if matchObj:
|
|
fieldName = matchObj.group('fieldName')
|
|
strFieldValue = matchObj.group('fieldValue')
|
|
if fieldName == 'job_number':
|
|
assert( job.getId().asStr() == strFieldValue )
|
|
elif fieldName == 'hard_queue_list':
|
|
allowedQueues = strFieldValue.split(',')
|
|
assert(len(allowedQueues) > 0)
|
|
job.m_jobRequirements.m_queues = allowedQueues
|
|
elif fieldName == 'parallel environment':
|
|
# the value could be 'ompi range: 32'
|
|
matchObj = re.match('ompi range: (?P<numSlots>[0-9]+)[?]*', strFieldValue)
|
|
if matchObj:
|
|
job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI
|
|
else:
|
|
assert( False )
|
|
else:
|
|
# ignore he other fields
|
|
None
|
|
line = f.readline()
|
|
f.close()
|
|
|