class JobStateFlags: RUNNING=1 # the job is running WAITING=2 # the job is waiting QUEUED=4 # not sure what that exactly means but it reflects the q state of jobs as seen in the pending jobs list from qstat -f -u \* TRANSFERING=8 DELETED=16 HOLD=32 ERROR=64 SUSPENDED=128 class ParallelEnvironment: MPI=1 class JobRequirements: def __init__( self ): self.m_numSlots = None self.m_strArchitecture = None # machine architecture self.m_parallelEnvironment = None self.m_queues = None # the list of queues this job is allowed to run on class JobId: """ the identifier of a job. We treat each element of a job array as a separate job A single integer is no longer enough to identify a job because all elements in a job array share the same sge job identifier. To uniquely define a job array element, we also use the task id. """ def __init__( self, iJobId, iJobArrayElementId = None): self.m_iJobId = iJobId self.m_iJobArrayElementId = iJobArrayElementId # None if this identifier does not refer to a job array element def __hash__( self ): """ required to use a JobId as a dict hash key """ return self.m_iJobId # very simple hashing that conflicts only for job array elements def __eq__( self, other ): """ required to use a JobId as a dict hash key """ if self.m_iJobId != other.m_iJobId: return False if self.m_iJobArrayElementId != other.m_iJobArrayElementId: return False return True def isJobArrayElement( self ): return (self.m_iJobArrayElementId != None) def getMainId(self): return self.m_iJobId def asStr( self ): strResult = '%s' % self.m_iJobId if self.isJobArrayElement(): strResult += '.%d' % self.m_iJobArrayElementId return strResult class Job: def __init__( self, jobId ): self.m_jobId = jobId self.m_startTime = None self.m_submitTime = None self.m_owner = None self.m_scriptName = None self.m_slots = {} self.m_stateFlags = 0 self.m_jobRequirements = JobRequirements() self.m_requestedRamPerCore = 0 def getId( self ): return self.m_jobId def setState( self, state ): self.m_stateFlags = state def setOwner( self, jobOwner ): if self.m_owner: assert( self.m_owner == jobOwner ) self.m_owner = jobOwner def getOwner( self ): return self.m_owner def setStartTime( self, jobStartTime ): if self.m_startTime: assert( self.m_startTime == jobStartTime ) self.m_startTime = jobStartTime def setSubmitTime( self, jobSubmitTime ): if self.m_submitTime: assert( self.m_submitTime == jobSubmitTime ) self.m_submitTime = jobSubmitTime def getStartTime( self ): return self.m_startTime def setScriptName( self, jobScriptName ): if self.m_scriptName: assert( self.m_scriptName == jobScriptName ) self.m_scriptName = jobScriptName def addSlots( self, queueMachineName, numSlots ): assert( self.m_slots.get( queueMachineName ) == None ) if self.m_slots.get( queueMachineName ) == None: self.m_slots[ queueMachineName ] = numSlots else: # should never happen self.m_slots[ queueMachineName ] += numSlots def getSlots( self ): return self.m_slots def setNumRequiredSlots( self, numSlots ): self.m_jobRequirements.m_numSlots = numSlots def isPending( self ): """ returns true if this job is waiting in the queue for whatever reason """ return self.m_stateFlags & JobStateFlags.QUEUED def getRequestedRamPerCore( self ): """ requested RAM per core in bytes """ return self.m_requestedRamPerCore def setRequestedRamPerCore( self, requestedRam ): """ requestedRam : requested RAM per core in bytes """ self.m_requestedRamPerCore=requestedRam