142 lines
7.6 KiB
Python
142 lines
7.6 KiB
Python
from PowerState import *
|
|
from Log import *
|
|
import time
|
|
import copy
|
|
|
|
class Slot:
|
|
def __init__( self ):
|
|
self.m_queueMachine = None
|
|
self.m_numSlots = None
|
|
self.m_job = None # job for which this slot is allocated
|
|
|
|
class SlotAllocator:
|
|
"""
|
|
a class that defines a strategy for allocating free slots for the given pending jobs
|
|
"""
|
|
def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ):
|
|
"""
|
|
returns the list of machines that need to wake up to make pending jobs running
|
|
"""
|
|
assert( False ) # this method is abstract
|
|
|
|
class SimpleSlotAllocator( SlotAllocator ):
|
|
def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ):
|
|
machinesThatNeedWakeUp = {}
|
|
highestPriorityPendingJob = pendingJobs.values()[0]
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr() )
|
|
numFreeSlots = {} # contains the number of free slots for each queueMachine
|
|
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
|
|
numFreeSlots[ queueMachine ] = clusterState.getJobsState().getNumFreeSlotsOnQueueMachine( queueMachine )
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : init numFreeSlots[ %s ] with %d ' % (queueMachine.getName(), numFreeSlots[ queueMachine ]) )
|
|
remainingNumSlotsToAllocate = highestPriorityPendingJob.m_jobRequirements.m_numSlots
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate )
|
|
# first look in running machines if there are available slots
|
|
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() )
|
|
machine = clusterState.getMachines()[ queueMachine.getMachineName() ]
|
|
if machine.getPowerState() == PowerState.ON:
|
|
if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ):
|
|
numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate )
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) )
|
|
|
|
remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine
|
|
numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate )
|
|
assert( remainingNumSlotsToAllocate >= 0 )
|
|
if remainingNumSlotsToAllocate == 0:
|
|
break
|
|
if remainingNumSlotsToAllocate > 0:
|
|
# now look into machines that are asleep
|
|
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() )
|
|
machine = clusterState.getMachines()[ queueMachine.getMachineName() ]
|
|
if machine.getPowerState() == PowerState.SLEEP:
|
|
if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ):
|
|
numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate )
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) )
|
|
remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine
|
|
numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine
|
|
machinesThatNeedWakeUp[ machine.getName() ] = machine
|
|
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate )
|
|
assert( remainingNumSlotsToAllocate >= 0 )
|
|
if remainingNumSlotsToAllocate == 0:
|
|
break
|
|
if remainingNumSlotsToAllocate != 0:
|
|
return {} # not enough slots available
|
|
return machinesThatNeedWakeUp
|
|
|
|
class DecoupledSlotAllocator( SlotAllocator ):
|
|
"""
|
|
a slot allocator that doesn't know much about sge, and does not attempts to guess what sge'sceduler would do
|
|
Instead, it uses a very simple strategy : it wakes up all the machines periodically to allow jobs to get in.
|
|
"""
|
|
def __init__( self ):
|
|
self.m_delayBetweenPeriodicChecks = -1 # in seconds. Disable periodic checks by setting this to -1
|
|
self.m_lastCheckTime = time.time()
|
|
self.m_lastClusterState = None
|
|
def jobsStateHasChanged( self, newClusterState ):
|
|
"""
|
|
returns true if there is a change in the cluster state that can cause a pending job
|
|
to start (provided all machines are enabled)
|
|
"""
|
|
oldJobs = {}
|
|
if self.m_lastClusterState:
|
|
oldJobs = self.m_lastClusterState.m_jobsState.m_jobs
|
|
newJobs = newClusterState.m_jobsState.m_jobs
|
|
bJobsHaveChanged = False
|
|
oldJobsOnly = oldJobs.copy() # shallow copy
|
|
#print 'oldJobs : ', oldJobs
|
|
#print 'newJobs : ', newJobs
|
|
"""
|
|
print 'self.m_lastClusterState', self.m_lastClusterState
|
|
print 'newClusterState', newClusterState
|
|
if self.m_lastClusterState:
|
|
print 'self.m_lastClusterState.m_jobsState', self.m_lastClusterState.m_jobsState
|
|
print 'newClusterState.m_jobsState', newClusterState.m_jobsState
|
|
print 'id(self.m_lastClusterState) : ', id(self.m_lastClusterState)
|
|
print 'id(newClusterState) : ', id(newClusterState)
|
|
print 'len(oldJobs) : ', len(oldJobs)
|
|
print 'len(newJobs) : ', len(newJobs)
|
|
print 'id(oldJobs) : ', id(oldJobs)
|
|
print 'id(newJobs) : ', id(newJobs)
|
|
"""
|
|
for newJob in newJobs.itervalues():
|
|
#logDebug('DecoupledSlotAllocator::jobsStateHasChanged newJob id=%s' % newJob.getId().asStr())
|
|
if newJob.getId() in oldJobs:
|
|
#logDebug('DecoupledSlotAllocator::jobsStateHasChanged job id=%d is in old jobs' % newJob.getId())
|
|
del oldJobsOnly[newJob.getId()]
|
|
else:
|
|
# ah ... a new job has arrived
|
|
logInfo('A new job (jobId =%s) has been detected ' % newJob.getId().asStr() )
|
|
bJobsHaveChanged = True
|
|
if len(oldJobsOnly) != 0:
|
|
for oldJob in oldJobsOnly.itervalues():
|
|
logInfo('Job (jobId =%s) has finished' % oldJob.getId().asStr() )
|
|
# at least one old job has finished, freeing some slots
|
|
bJobsHaveChanged = True
|
|
return bJobsHaveChanged
|
|
def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ):
|
|
machinesThatNeedWakeUp = {}
|
|
bJobsStateHasChanged = self.jobsStateHasChanged( clusterState )
|
|
currentTime = time.time()
|
|
# we do periodic checks to detect changes in cluster state that are not detected by jobsStateHasChanged
|
|
# for example changes in the requirements, in the allocation policy, etc...
|
|
bItsTimeForPeriodicCheck = False
|
|
if self.m_delayBetweenPeriodicChecks > 0:
|
|
bItsTimeForPeriodicCheck = (currentTime - self.m_lastCheckTime) > self.m_delayBetweenPeriodicChecks
|
|
if bJobsStateHasChanged or bItsTimeForPeriodicCheck:
|
|
if bJobsStateHasChanged:
|
|
logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed')
|
|
else:
|
|
logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)')
|
|
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
|
|
if queueMachine.getMachineName() in clusterState.getMachines():
|
|
# this means that the machine is under the cluster controller's control
|
|
machine = clusterState.getMachines()[ queueMachine.getMachineName() ]
|
|
if machine.getPowerState() == PowerState.SLEEP:
|
|
machinesThatNeedWakeUp[ machine.getName() ] = machine
|
|
self.m_lastCheckTime = currentTime
|
|
self.m_lastClusterState = copy.copy(clusterState)
|
|
#print 'self.m_lastClusterState', self.m_lastClusterState
|
|
return machinesThatNeedWakeUp
|