cocluto/ClusterController/SlotAllocator.py

142 lines
7.6 KiB
Python
Raw Normal View History

from PowerState import *
from Log import *
import time
import copy
class Slot:
def __init__( self ):
self.m_queueMachine = None
self.m_numSlots = None
self.m_job = None # job for which this slot is allocated
class SlotAllocator:
"""
a class that defines a strategy for allocating free slots for the given pending jobs
"""
def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ):
"""
returns the list of machines that need to wake up to make pending jobs running
"""
assert( False ) # this method is abstract
class SimpleSlotAllocator( SlotAllocator ):
def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ):
machinesThatNeedWakeUp = {}
highestPriorityPendingJob = pendingJobs.values()[0]
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr() )
numFreeSlots = {} # contains the number of free slots for each queueMachine
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
numFreeSlots[ queueMachine ] = clusterState.getJobsState().getNumFreeSlotsOnQueueMachine( queueMachine )
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : init numFreeSlots[ %s ] with %d ' % (queueMachine.getName(), numFreeSlots[ queueMachine ]) )
remainingNumSlotsToAllocate = highestPriorityPendingJob.m_jobRequirements.m_numSlots
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate )
# first look in running machines if there are available slots
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() )
machine = clusterState.getMachines()[ queueMachine.getMachineName() ]
if machine.getPowerState() == PowerState.ON:
if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ):
numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate )
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) )
remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine
numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate )
assert( remainingNumSlotsToAllocate >= 0 )
if remainingNumSlotsToAllocate == 0:
break
if remainingNumSlotsToAllocate > 0:
# now look into machines that are asleep
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() )
machine = clusterState.getMachines()[ queueMachine.getMachineName() ]
if machine.getPowerState() == PowerState.SLEEP:
if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ):
numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate )
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) )
remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine
numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine
machinesThatNeedWakeUp[ machine.getName() ] = machine
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate )
assert( remainingNumSlotsToAllocate >= 0 )
if remainingNumSlotsToAllocate == 0:
break
if remainingNumSlotsToAllocate != 0:
return {} # not enough slots available
return machinesThatNeedWakeUp
class DecoupledSlotAllocator( SlotAllocator ):
"""
a slot allocator that doesn't know much about sge, and does not attempts to guess what sge'sceduler would do
Instead, it uses a very simple strategy : it wakes up all the machines periodically to allow jobs to get in.
"""
def __init__( self ):
self.m_delayBetweenPeriodicChecks = -1 # in seconds. Disable periodic checks by setting this to -1
self.m_lastCheckTime = time.time()
self.m_lastClusterState = None
def jobsStateHasChanged( self, newClusterState ):
"""
returns true if there is a change in the cluster state that can cause a pending job
to start (provided all machines are enabled)
"""
oldJobs = {}
if self.m_lastClusterState:
oldJobs = self.m_lastClusterState.m_jobsState.m_jobs
newJobs = newClusterState.m_jobsState.m_jobs
bJobsHaveChanged = False
oldJobsOnly = oldJobs.copy() # shallow copy
#print 'oldJobs : ', oldJobs
#print 'newJobs : ', newJobs
"""
print 'self.m_lastClusterState', self.m_lastClusterState
print 'newClusterState', newClusterState
if self.m_lastClusterState:
print 'self.m_lastClusterState.m_jobsState', self.m_lastClusterState.m_jobsState
print 'newClusterState.m_jobsState', newClusterState.m_jobsState
print 'id(self.m_lastClusterState) : ', id(self.m_lastClusterState)
print 'id(newClusterState) : ', id(newClusterState)
print 'len(oldJobs) : ', len(oldJobs)
print 'len(newJobs) : ', len(newJobs)
print 'id(oldJobs) : ', id(oldJobs)
print 'id(newJobs) : ', id(newJobs)
"""
for newJob in newJobs.itervalues():
#logDebug('DecoupledSlotAllocator::jobsStateHasChanged newJob id=%s' % newJob.getId().asStr())
if newJob.getId() in oldJobs:
#logDebug('DecoupledSlotAllocator::jobsStateHasChanged job id=%d is in old jobs' % newJob.getId())
del oldJobsOnly[newJob.getId()]
else:
# ah ... a new job has arrived
logInfo('A new job (jobId =%s) has been detected ' % newJob.getId().asStr() )
bJobsHaveChanged = True
if len(oldJobsOnly) != 0:
for oldJob in oldJobsOnly.itervalues():
logInfo('Job (jobId =%s) has finished' % oldJob.getId().asStr() )
# at least one old job has finished, freeing some slots
bJobsHaveChanged = True
return bJobsHaveChanged
def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ):
machinesThatNeedWakeUp = {}
bJobsStateHasChanged = self.jobsStateHasChanged( clusterState )
currentTime = time.time()
# we do periodic checks to detect changes in cluster state that are not detected by jobsStateHasChanged
# for example changes in the requirements, in the allocation policy, etc...
bItsTimeForPeriodicCheck = False
if self.m_delayBetweenPeriodicChecks > 0:
bItsTimeForPeriodicCheck = (currentTime - self.m_lastCheckTime) > self.m_delayBetweenPeriodicChecks
if bJobsStateHasChanged or bItsTimeForPeriodicCheck:
if bJobsStateHasChanged:
logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed')
else:
logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)')
for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues():
if queueMachine.getMachineName() in clusterState.getMachines():
# this means that the machine is under the cluster controller's control
machine = clusterState.getMachines()[ queueMachine.getMachineName() ]
if machine.getPowerState() == PowerState.SLEEP:
machinesThatNeedWakeUp[ machine.getName() ] = machine
self.m_lastCheckTime = currentTime
self.m_lastClusterState = copy.copy(clusterState)
#print 'self.m_lastClusterState', self.m_lastClusterState
return machinesThatNeedWakeUp