import StringIO import re from JobsState import * from QueueMachine import * from Util import * from Log import * from Job import * class QstatParser: def parseJobState( self, strJobStatus ): jobState = 0 for i in range(0, len(strJobStatus) ): c = strJobStatus[i] if c == 'r': jobState += JobStateFlags.RUNNING elif c == 'w': jobState += JobStateFlags.WAITING elif c == 'q': jobState += JobStateFlags.QUEUED elif c == 't': jobState += JobStateFlags.TRANSFERING elif c == 'd': jobState += JobStateFlags.DELETED elif c == 'h': jobState += JobStateFlags.HOLD elif c == 's': jobState += JobStateFlags.SUSPENDED elif c == 'E': jobState += JobStateFlags.ERROR else: assert False, 'unhandled job state flag :"' + c + '"' return jobState def parseQueueMachineState( self, strQueueMachineStatus ): queueMachineState = 0 for i in range(0, len(strQueueMachineStatus) ): c = strQueueMachineStatus[i] if c == 'd': queueMachineState += QueueMachineStateFlags.DISABLED elif c == 'a': queueMachineState += QueueMachineStateFlags.ALARM elif c == 'u': queueMachineState += QueueMachineStateFlags.UNKNOWN elif c == 'E': queueMachineState += QueueMachineStateFlags.ERROR elif c == 'o': queueMachineState += QueueMachineStateFlags.OBSOLETE elif c == 's': queueMachineState += QueueMachineStateFlags.SUSPENDED else: assert False, 'unhandled queue machine state flag :"' + c + '"' return queueMachineState def parseQstatOutput( self, qstatOutput ): """ parses result of command 'qstat -f -u \* -pri' """ def parse_pending_tasks(task_ranges_sequence): """ parses a job's task ids encoded in the form of a string containing a sequence of ranges :param str task_ranges_sequence: a job's task ids encoded in the form of a string containing a sequence of non overlapping ranges separated with a comma. Each range is expected to be in the form "-:" :return list(int): the list of task ids for example, this function would return [1, 2, 3, 4, 6, 7, 8] for the input string "1-4:1,6-8:1" """ task_ids = [] astrRanges = re.split(',', task_ranges_sequence) for strRange in astrRanges: singleIndexMatch = re.match('^(?P[0-9]+)$', strRange) if singleIndexMatch: iElementIndex = int(singleIndexMatch.group('elementIndex')) task_ids.extend(range(iElementIndex, iElementIndex+1)) else: # we expect strRange to be of the form "1-4:1", where : # the 1st number is the min element index (sge imposes it to be greater than 0) # the 2nd number is the max element index # the 3rd number is the step between consecutive element indices rangeMatch = re.match( '^(?P[0-9]+)-(?P[0-9]+):(?P[0-9]+)$', strRange) if rangeMatch == None: logError('unexpected format for job array details : "%s" (line="%s"' % (strRange, line) ) assert(False) iMinElementIndex=int(rangeMatch.group('minElementIndex')) iMaxElementIndex=int(rangeMatch.group('maxElementIndex')) iStepBetweenIndices=int(rangeMatch.group('stepBetweenIndices')) task_ids.extend(range(iMinElementIndex, iMaxElementIndex+1, iStepBetweenIndices)) return task_ids # ugly hack to work around the fact that qstat truncates the fqdn of cluster nodes # graffy@physix-master:~$ qstat -f -u \* # queuename qtype resv/used/tot. load_avg arch states # --------------------------------------------------------------------------------- # main.q@physix88.ipr.univ-renne BIP 0/0/36 14.03 lx-amd64 # TODO: fix this properly by parsing the output of 'qstat -f -u \* -xml' instead of 'qstat -f -u \*' qstatOutput = re.sub('\.univ[^ ]*', '.univ-rennes1.fr', qstatOutput) jobsState = JobsState() f = StringIO.StringIO(qstatOutput) line = f.readline() currentQueueMachine = None bInPendingJobsSection = False # examples of job line : # 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1 # a typical job line in the pending jobs section looks like this : # 43645 0.00000 LC_LV_MC aghoufi qw 08/21/2009 08:14:58 1 # a typical running job array line looks like this # 43619 0.56000 SimpleJobA raffy r 08/20/2009 18:13:03 1 3 # a typical job array line in the pending jobs section looks like this # 43646 0.00000 SimpleJobA raffy qw 08/21/2009 09:56:40 1 1-4:1 # nurg The job's total urgency value in normalized fashion. # npprior The job's -p priority in normalized fashion. # ntckts The job's ticket amount in normalized fashion. # ppri The job's -p priority as specified by the user. jobRegularExp = re.compile( '^[ ]*(?P[^ ]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P[0-9]+)[ ]+(?P[^\n]*)[\s]*$' ) # example of machine line : # allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86 machineRegularExp = re.compile( '^(?P[^@]+)@(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^/]+)/(?P[^/]+)/(?P[^ ]+)[ ]+(?P[^ ]+)[\s]+(?P[^ ]+)[\s]+(?P[^\s]*)' ) pendingJobsHeaderRegularExp = re.compile( '^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*' ) while( len(line) > 0 ): # print line # check if the current line is a line describing a job running on a machine matchObj = jobRegularExp.match( line ) if matchObj: # we are dealing with a job line if not bInPendingJobsSection: assert( currentQueueMachine ) #log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"') iJobId = int(matchObj.group('jobId')) jobState = self.parseJobState( matchObj.group('jobStatus') ) strJobArrayDetails = matchObj.group('jobArrayDetails') bIsJobArray = (len(strJobArrayDetails) != 0) #logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray))) # each element of a job array is treated as a separate job for the sake of simplicity. # For these elements, the job id in sge sense is the same, but they are different in this program's sense task_ids = range(0,1) # just one element, unless it's a job array if bIsJobArray: if bInPendingJobsSection: task_ids = parse_pending_tasks(strJobArrayDetails) else: # we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element iJobArrayElementIndex = int(strJobArrayDetails) assert(iJobArrayElementIndex != 0) # sge does not allow element indices to be 0 task_ids = range(iJobArrayElementIndex,iJobArrayElementIndex+1) for task_id in task_ids: jobId = None if bIsJobArray: jobId = JobId(iJobId, task_id) else: jobId = JobId(iJobId) job = jobsState.getJob(jobId) #logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr())) if job == None: # this job hasn't been encountered yet in the output of qstat ... # we could either be in the pending jobs section or in the running jobs section job = Job(jobId) jobsState.addJob( job ) job.setState( jobState ) strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime') jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S') if bInPendingJobsSection: job.setSubmitTime( jobStartOrSubmitTime ) else: job.setStartTime( jobStartOrSubmitTime ) job.setOwner( matchObj.group('jobOwner') ) job.setScriptName( matchObj.group('jobScriptName') ) if bInPendingJobsSection: job.setNumRequiredSlots(int(matchObj.group('numSlots'))) else: assert( not bInPendingJobsSection ) # if we are in the pending jobs section, the job should be new if not bInPendingJobsSection: job.addSlots( currentQueueMachine.getName(), int(matchObj.group('numSlots')) ) else: # the current line does not describe a job if not bInPendingJobsSection: # check if this line describes the status of a machine matchObj = machineRegularExp.match( line ) if matchObj: queueName = matchObj.group('queueName') machineName = matchObj.group('machineName') queueMachine = QueueMachine( queueName, machineName ) #log(line) #log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString')) #log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots')) queueMachine.setNumSlots( int( matchObj.group('numTotalSlots') ) ) queueMachine.setNumUsedSlots( int( matchObj.group('numUsedSlots') ) ) strCpuLoad = matchObj.group('cpuLoad') if strCpuLoad != '-NA-': queueMachine.setCpuLoad( float(strCpuLoad) ) strQueueMachineState = matchObj.group('queueMachineStatus') queueMachine.setState( self.parseQueueMachineState( strQueueMachineState ) ) #log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"') #log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"') currentQueueMachine = queueMachine jobsState.addQueueMachine( queueMachine ) else: matchObj = pendingJobsHeaderRegularExp.match( line ) if matchObj: bInPendingJobsSection = True currentQueueMachine = None else: #print line None else: # we are in a pending jobs section matchObj = re.match('^[#]+$', line) if not matchObj: # unexpected line print 'line = "' + line + '"' assert( False ) None line = f.readline() f.close() return jobsState def parseJobDetails( self, qstatOutput, job ): """ adds to job the details parsed from the output of the "qstat -j " command """ f = StringIO.StringIO(qstatOutput) line = f.readline() fieldRegularExp = re.compile( '^(?P[^:]+):[ ]+(?P[?]*)$' ) while( len(line) > 0 ): # print line # check if the current line is a line describing a job running on a machine matchObj = fieldRegularExp.match( line ) if matchObj: fieldName = matchObj.group('fieldName') strFieldValue = matchObj.group('fieldValue') if fieldName == 'job_number': assert( job.getId().asStr() == strFieldValue ) elif fieldName == 'hard_queue_list': allowedQueues = strFieldValue.split(',') assert(len(allowedQueues) > 0) job.m_jobRequirements.m_queues = allowedQueues elif fieldName == 'parallel environment': # the value could be 'ompi range: 32' matchObj = re.match('ompi range: (?P[0-9]+)[?]*', strFieldValue) if matchObj: job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI else: assert( False ) else: # ignore he other fields None line = f.readline() f.close()