import StringIO import re from JobsState import * from QueueMachine import * from Util import * from Log import * from Job import * class QstatParser: def parseJobState( self, strJobStatus ): jobState = 0 for i in range(0, len(strJobStatus) ): c = strJobStatus[i] if c == 'r': jobState += JobStateFlags.RUNNING elif c == 'w': jobState += JobStateFlags.WAITING elif c == 'q': jobState += JobStateFlags.QUEUED elif c == 't': jobState += JobStateFlags.TRANSFERING elif c == 'd': jobState += JobStateFlags.DELETED elif c == 'h': jobState += JobStateFlags.HOLD elif c == 's': jobState += JobStateFlags.SUSPENDED elif c == 'E': jobState += JobStateFlags.ERROR else: assert False, 'unhandled job state flag :"' + c + '"' return jobState def parseQueueMachineState( self, strQueueMachineStatus ): queueMachineState = 0 for i in range(0, len(strQueueMachineStatus) ): c = strQueueMachineStatus[i] if c == 'd': queueMachineState += QueueMachineStateFlags.DISABLED elif c == 'a': queueMachineState += QueueMachineStateFlags.ALARM elif c == 'u': queueMachineState += QueueMachineStateFlags.UNKNOWN elif c == 'E': queueMachineState += QueueMachineStateFlags.ERROR elif c == 'o': queueMachineState += QueueMachineStateFlags.OBSOLETE elif c == 's': queueMachineState += QueueMachineStateFlags.SUSPENDED else: assert False, 'unhandled queue machine state flag :"' + c + '"' return queueMachineState def parseQstatOutput( self, qstatOutput ): jobsState = JobsState() f = StringIO.StringIO(qstatOutput) line = f.readline() currentQueueMachine = None bInPendingJobsSection = False # examples of job line : # 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1 # a typical job line in the pending jobs section looks like this : # 43645 0.00000 LC_LV_MC aghoufi qw 08/21/2009 08:14:58 1 # a typical running job array line looks like this # 43619 0.56000 SimpleJobA raffy r 08/20/2009 18:13:03 1 3 # a typical job array line in the pending jobs section looks like this # 43646 0.00000 SimpleJobA raffy qw 08/21/2009 09:56:40 1 1-4:1 jobRegularExp = re.compile( '^[ ]*(?P[^ ]+)[ ]+[0-9.]+[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P[0-9]+)[ ]+(?P[^\n]*)[\s]*$' ) # example of machine line : # allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86 machineRegularExp = re.compile( '^(?P[^@]+)@(?P[^.]+)[^ ]+[ ]+(?P[^ ]+)[ ]+(?P[^/]+)/(?P[^/]+)/(?P[^ ]+)[ ]+(?P[^ ]+)[\s]+(?P[^ ]+)[\s]+(?P[^\s]*)' ) pendingJobsHeaderRegularExp = re.compile( '^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*' ) while( len(line) > 0 ): # print line # check if the current line is a line describing a job running on a machine matchObj = jobRegularExp.match( line ) if matchObj: # we are dealing with a job line if not bInPendingJobsSection: assert( currentQueueMachine ) #log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"') iJobId = int(matchObj.group('jobId')) jobState = self.parseJobState( matchObj.group('jobStatus') ) strJobArrayDetails = matchObj.group('jobArrayDetails') bIsJobArray = (len(strJobArrayDetails) != 0) #logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray))) # each element of a job array is treated as a separate job for the sake of simplicity. # For these elements, the job id in sge sense is the same, but they are different in this program's sense jobElementsIndexRange = range(0,1) # just one element, unless it's a job array if bIsJobArray: if bInPendingJobsSection: jobElementsIndexRange = [] astrRanges = re.split(',', strJobArrayDetails) for strRange in astrRanges: singleIndexMatch = re.match('^(?P[0-9]+)$', strRange) if singleIndexMatch: iElementIndex = int(singleIndexMatch.group('elementIndex')) jobElementsIndexRange.extend(range(iElementIndex, iElementIndex+1)) else: # we expect strRange to be of the form "1-4:1", where : # the 1st number is the min element index (sge imposes it to be greater than 0) # the 2nd number is the max element index # the 3rd number is the step between consecutive element indices rangeMatch = re.match( '^(?P[0-9]+)-(?P[0-9]+):(?P[0-9]+)$', strRange) if rangeMatch == None: logError('unexpected format for job array details : "%s" (line="%s"' % (strRange, line) ) assert(False) iMinElementIndex=int(rangeMatch.group('minElementIndex')) iMaxElementIndex=int(rangeMatch.group('maxElementIndex')) iStepBetweenIndices=int(rangeMatch.group('stepBetweenIndices')) jobElementsIndexRange.extend(range(iMinElementIndex, iMaxElementIndex+1, iStepBetweenIndices)) else: # we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element iJobArrayElementIndex = int(strJobArrayDetails) assert(iJobArrayElementIndex != 0) # sge does not allow element indices to be 0 jobElementsIndexRange = range(iJobArrayElementIndex,iJobArrayElementIndex+1) for iElementIndex in jobElementsIndexRange: jobId = None if bIsJobArray: jobId = JobId(iJobId, iElementIndex) else: jobId = JobId(iJobId) job = jobsState.getJob(jobId) #logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr())) if job == None: # this job hasn't been encountered yet in the output of qstat ... # we could either be in the pending jobs section or in the running jobs section job = Job(jobId) jobsState.addJob( job ) job.setState( jobState ) strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime') jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S') if bInPendingJobsSection: job.setSubmitTime( jobStartOrSubmitTime ) else: job.setStartTime( jobStartOrSubmitTime ) job.setOwner( matchObj.group('jobOwner') ) job.setScriptName( matchObj.group('jobScriptName') ) if bInPendingJobsSection: job.setNumRequiredSlots(int(matchObj.group('numSlots'))) else: assert( not bInPendingJobsSection ) # if we are in the pending jobs section, the job should be new if not bInPendingJobsSection: job.addSlots( currentQueueMachine.getName(), int(matchObj.group('numSlots')) ) else: # the current line does not describe a job if not bInPendingJobsSection: # check if this line describes the status of a machine matchObj = machineRegularExp.match( line ) if matchObj: queueName = matchObj.group('queueName') machineName = matchObj.group('machineName') queueMachine = QueueMachine( queueName, machineName ) #log(line) #log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString')) #log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots')) queueMachine.setNumSlots( int( matchObj.group('numTotalSlots') ) ) queueMachine.setNumUsedSlots( int( matchObj.group('numUsedSlots') ) ) strCpuLoad = matchObj.group('cpuLoad') if strCpuLoad != '-NA-': queueMachine.setCpuLoad( float(strCpuLoad) ) strQueueMachineState = matchObj.group('queueMachineStatus') queueMachine.setState( self.parseQueueMachineState( strQueueMachineState ) ) #log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"') #log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"') currentQueueMachine = queueMachine jobsState.addQueueMachine( queueMachine ) else: matchObj = pendingJobsHeaderRegularExp.match( line ) if matchObj: bInPendingJobsSection = True currentQueueMachine = None else: #print line None else: # we are in a pending jobs section matchObj = re.match('^[#]+$', line) if not matchObj: # unexpected line print 'line = "' + line + '"' assert( False ) None line = f.readline() f.close() return jobsState def parseJobDetails( self, qstatOutput, job ): """ adds to job the details parsed from the output of the "qstat -j " command """ f = StringIO.StringIO(qstatOutput) line = f.readline() fieldRegularExp = re.compile( '^(?P[^:]+):[ ]+(?P[?]*)$' ) while( len(line) > 0 ): # print line # check if the current line is a line describing a job running on a machine matchObj = fieldRegularExp.match( line ) if matchObj: fieldName = matchObj.group('fieldName') strFieldValue = matchObj.group('fieldValue') if fieldName == 'job_number': assert( job.getId().asStr() == strFieldValue ) elif fieldName == 'hard_queue_list': allowedQueues = strFieldValue.split(',') assert(len(allowedQueues) > 0) job.m_jobRequirements.m_queues = allowedQueues elif fieldName == 'parallel environment': # the value could be 'ompi range: 32' matchObj = re.match('ompi range: (?P[0-9]+)[?]*', strFieldValue) if matchObj: job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI else: assert( False ) else: # ignore he other fields None line = f.readline() f.close()