cocluto/ClusterController/Job.py

122 lines
3.5 KiB
Python

class JobStateFlags:
RUNNING=1 # the job is running
WAITING=2 # the job is waiting
QUEUED=4 # not sure what that exactly means but it reflects the q state of jobs as seen in the pending jobs list from qstat -f -u \*
TRANSFERING=8
DELETED=16
HOLD=32
ERROR=64
SUSPENDED=128
class ParallelEnvironment:
MPI=1
class JobRequirements:
def __init__( self ):
self.m_numSlots = None
self.m_strArchitecture = None # machine architecture
self.m_parallelEnvironment = None
self.m_queues = None # the list of queues this job is allowed to run on
class JobId:
"""
the identifier of a job.
We treat each element of a job array as a separate job
A single integer is no longer enough to identify a job because all elements in a job array
share the same sge job identifier. To uniquely define a job array element, we also use the task id.
"""
def __init__( self, iJobId, iJobArrayElementId = None):
self.m_iJobId = iJobId
self.m_iJobArrayElementId = iJobArrayElementId # None if this identifier does not refer to a job array element
def __hash__( self ):
"""
required to use a JobId as a dict hash key
"""
return self.m_iJobId # very simple hashing that conflicts only for job array elements
def __eq__( self, other ):
"""
required to use a JobId as a dict hash key
"""
if self.m_iJobId != other.m_iJobId:
return False
if self.m_iJobArrayElementId != other.m_iJobArrayElementId:
return False
return True
def isJobArrayElement( self ):
return (self.m_iJobArrayElementId != None)
def asStr( self ):
strResult = '%s' % self.m_iJobId
if self.isJobArrayElement():
strResult += '.%d' % self.m_iJobArrayElementId
return strResult
class Job:
def __init__( self, jobId ):
self.m_jobId = jobId
self.m_startTime = None
self.m_submitTime = None
self.m_owner = None
self.m_scriptName = None
self.m_slots = {}
self.m_stateFlags = 0
self.m_jobRequirements = JobRequirements()
self.m_requestedRamPerCore = 0
def getId( self ):
return self.m_jobId
def setState( self, state ):
self.m_stateFlags = state
def setOwner( self, jobOwner ):
if self.m_owner:
assert( self.m_owner == jobOwner )
self.m_owner = jobOwner
def getOwner( self ):
return self.m_owner
def setStartTime( self, jobStartTime ):
if self.m_startTime:
assert( self.m_startTime == jobStartTime )
self.m_startTime = jobStartTime
def setSubmitTime( self, jobSubmitTime ):
if self.m_submitTime:
assert( self.m_submitTime == jobSubmitTime )
self.m_submitTime = jobSubmitTime
def getStartTime( self ):
return self.m_startTime
def setScriptName( self, jobScriptName ):
if self.m_scriptName:
assert( self.m_scriptName == jobScriptName )
self.m_scriptName = jobScriptName
def addSlots( self, queueMachineName, numSlots ):
assert( self.m_slots.get( queueMachineName ) == None )
if self.m_slots.get( queueMachineName ) == None:
self.m_slots[ queueMachineName ] = numSlots
else:
# should never happen
self.m_slots[ queueMachineName ] += numSlots
def getSlots( self ):
return self.m_slots
def setNumRequiredSlots( self, numSlots ):
self.m_jobRequirements.m_numSlots = numSlots
def isPending( self ):
"""
returns true if this job is waiting in the queue for whatever reason
"""
return self.m_stateFlags & JobStateFlags.QUEUED
def getRequestedRamPerCore( self ):
"""
requested RAM per core in bytes
"""
return self.m_requestedRamPerCore
def setRequestedRamPerCore( self, requestedRam ):
"""
requestedRam : requested RAM per core in bytes
"""
self.m_requestedRamPerCore=requestedRam