from PowerState import * from Log import * import time import copy class Slot: def __init__( self ): self.m_queueMachine = None self.m_numSlots = None self.m_job = None # job for which this slot is allocated class SlotAllocator: """ a class that defines a strategy for allocating free slots for the given pending jobs """ def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ): """ returns the list of machines that need to wake up to make pending jobs running """ assert( False ) # this method is abstract class SimpleSlotAllocator( SlotAllocator ): def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ): machinesThatNeedWakeUp = {} highestPriorityPendingJob = pendingJobs.values()[0] logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr() ) numFreeSlots = {} # contains the number of free slots for each queueMachine for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): numFreeSlots[ queueMachine ] = clusterState.getJobsState().getNumFreeSlotsOnQueueMachine( queueMachine ) logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : init numFreeSlots[ %s ] with %d ' % (queueMachine.getName(), numFreeSlots[ queueMachine ]) ) remainingNumSlotsToAllocate = highestPriorityPendingJob.m_jobRequirements.m_numSlots logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate ) # first look in running machines if there are available slots for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() ) machine = clusterState.getMachines()[ queueMachine.getMachineName() ] if machine.getPowerState() == PowerState.ON: if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ): numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate ) logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) ) remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate ) assert( remainingNumSlotsToAllocate >= 0 ) if remainingNumSlotsToAllocate == 0: break if remainingNumSlotsToAllocate > 0: # now look into machines that are asleep for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() ) machine = clusterState.getMachines()[ queueMachine.getMachineName() ] if machine.getPowerState() == PowerState.SLEEP: if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ): numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate ) logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) ) remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine machinesThatNeedWakeUp[ machine.getName() ] = machine logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate ) assert( remainingNumSlotsToAllocate >= 0 ) if remainingNumSlotsToAllocate == 0: break if remainingNumSlotsToAllocate != 0: return {} # not enough slots available return machinesThatNeedWakeUp class DecoupledSlotAllocator( SlotAllocator ): """ a slot allocator that doesn't know much about sge, and does not attempts to guess what sge'sceduler would do Instead, it uses a very simple strategy : it wakes up all the machines periodically to allow jobs to get in. """ def __init__( self ): self.m_delayBetweenPeriodicChecks = -1 # in seconds. Disable periodic checks by setting this to -1 self.m_lastCheckTime = time.time() self.m_lastClusterState = None def jobsStateHasChanged( self, newClusterState ): """ returns true if there is a change in the cluster state that can cause a pending job to start (provided all machines are enabled) """ oldJobs = {} if self.m_lastClusterState: oldJobs = self.m_lastClusterState.m_jobsState.m_jobs newJobs = newClusterState.m_jobsState.m_jobs bJobsHaveChanged = False oldJobsOnly = oldJobs.copy() # shallow copy #print 'oldJobs : ', oldJobs #print 'newJobs : ', newJobs """ print 'self.m_lastClusterState', self.m_lastClusterState print 'newClusterState', newClusterState if self.m_lastClusterState: print 'self.m_lastClusterState.m_jobsState', self.m_lastClusterState.m_jobsState print 'newClusterState.m_jobsState', newClusterState.m_jobsState print 'id(self.m_lastClusterState) : ', id(self.m_lastClusterState) print 'id(newClusterState) : ', id(newClusterState) print 'len(oldJobs) : ', len(oldJobs) print 'len(newJobs) : ', len(newJobs) print 'id(oldJobs) : ', id(oldJobs) print 'id(newJobs) : ', id(newJobs) """ for newJob in newJobs.itervalues(): #logDebug('DecoupledSlotAllocator::jobsStateHasChanged newJob id=%s' % newJob.getId().asStr()) if newJob.getId() in oldJobs: #logDebug('DecoupledSlotAllocator::jobsStateHasChanged job id=%d is in old jobs' % newJob.getId()) del oldJobsOnly[newJob.getId()] else: # ah ... a new job has arrived logInfo('A new job (jobId =%s) has been detected ' % newJob.getId().asStr() ) bJobsHaveChanged = True if len(oldJobsOnly) != 0: for oldJob in oldJobsOnly.itervalues(): logInfo('Job (jobId =%s) has finished' % oldJob.getId().asStr() ) # at least one old job has finished, freeing some slots bJobsHaveChanged = True return bJobsHaveChanged def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ): machinesThatNeedWakeUp = {} bJobsStateHasChanged = self.jobsStateHasChanged( clusterState ) currentTime = time.time() # we do periodic checks to detect changes in cluster state that are not detected by jobsStateHasChanged # for example changes in the requirements, in the allocation policy, etc... bItsTimeForPeriodicCheck = False if self.m_delayBetweenPeriodicChecks > 0: bItsTimeForPeriodicCheck = (currentTime - self.m_lastCheckTime) > self.m_delayBetweenPeriodicChecks if bJobsStateHasChanged or bItsTimeForPeriodicCheck: if bJobsStateHasChanged: logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed') else: logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)') for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): if queueMachine.getMachineName() in clusterState.getMachines(): # this means that the machine is under the cluster controller's control machine = clusterState.getMachines()[ queueMachine.getMachineName() ] if machine.getPowerState() == PowerState.SLEEP: machinesThatNeedWakeUp[ machine.getName() ] = machine self.m_lastCheckTime = currentTime self.m_lastClusterState = copy.copy(clusterState) #print 'self.m_lastClusterState', self.m_lastClusterState return machinesThatNeedWakeUp