import threading from JobsStateUpdater import * import Lib.Util import Lib.SimpaDbUtil from ClusterNode import * import time class ClusterStatus: """ The current state (jobs, sensors) of the cluster @param gridEngine the interface to the batch job tool (in our case it's sun grid engine) """ def __init__(self, gridEngine): self.m_gridEngine = gridEngine self.m_clusterNodes = {} self.m_lock = threading.Lock() # to prevent concurrent access to this instance self.m_jobsStateUpdater = JobsStateUpdater( self ) self.m_jobsState = None #self.m_controlledMachineNames = [ 'simpatix30' ] self.m_controlledMachineNames = [] # [ 'simpatix30' ] if False: for iMachine in range(11, 40): if (iMachine == 31) or (iMachine == 32): continue # these machines don't seem to be able to go to sleep properly (bug 00000010) if (iMachine == 18): continue # this machine needs maintenance (restarting because it's very slow for an unknown reason) self.m_controlledMachineNames.append( 'simpatix%d' % iMachine ) nodeNames = Lib.SimpaDbUtil.getClusterMachinesNames() for nodeName in nodeNames: if nodeName in self.m_controlledMachineNames: logInfo( 'machine %s is under the cluster controller\'s control' % nodeName ) clusterNode = ClusterNode( nodeName, self, gridEngine ) if nodeName == 'simpatix10': clusterNode.setShouldAlwaysBeOn() self.m_clusterNodes[ nodeName ] = clusterNode return def setControlOnMachine(self, machineName, bControl): if bControl: # add machineName under control of ClusterController for k, v in self.m_clusterNodes.iteritems(): if v.getName() == machineName : return # nothing to do : machineName is already under the control of ClusterController clusterNode = ClusterNode( machineName, self, self.m_gridEngine ) if machineName == 'simpatix10': clusterNode.setShouldAlwaysBeOn() self.m_clusterNodes[ machineName ] = clusterNode clusterNode.m_machineStatusUpdater.start() else: # remove machineName from control of ClusterController clusterNode = self.m_clusterNodes.get(machineName) if clusterNode: clusterNode.m_machineStatusUpdater.m_bStop = True clusterNode.m_machineStatusUpdater.join() self.m_clusterNodes.pop(machineName) def getGridEngine( self ): return self.m_gridEngine def getMachines( self ): return self.m_clusterNodes def startReadingThreads( self ): for k, v in self.m_clusterNodes.iteritems(): v.m_machineStatusUpdater.start() self.m_jobsStateUpdater.start() def stopReadingThreads( self ): for k, v in self.m_clusterNodes.iteritems(): v.m_machineStatusUpdater.m_bStop = True v.m_machineStatusUpdater.join() self.m_jobsStateUpdater.m_bStop = True self.m_jobsStateUpdater.join() def onNewJobsState( self, newJobsState ): #logDebug( 'ClusterStatus::onNewJobsState : attempting to acquire lock to access m_jobsState' ) self.m_lock.acquire() #logDebug( 'ClusterStatus::onNewJobsState : got lock to access m_jobsState' ) self.m_jobsState = newJobsState self.m_lock.release() def getJobsOnMachine( self, machineName ): return self.m_jobsState.getJobsOnMachine( machineName ) def isReady( self ): for k, v in self.m_clusterNodes.iteritems(): if not v.isReady(): logInfo( 'ClusterStatus::isReady : not ready because of ' + v.getName() ) return False #log('ClusterStatus::isReady() : '+k+' is ready') #assert( False ) if self.m_jobsState == None: logInfo( 'ClusterStatus::isReady : not ready because waiting for jobs state' ) return False return True def getIdleMachines( self ): assert( self.isReady ) bBUG_00000009_IS_STILL_ALIVE = True if bBUG_00000009_IS_STILL_ALIVE: currentTime = time.time() fJOBS_STATE_MAX_ALLOWED_AGE = 3600 fJobsStateAge = currentTime - self.m_jobsState.getTime() if fJobsStateAge > fJOBS_STATE_MAX_ALLOWED_AGE: logError('ClusterStatus::getIdleMachines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge)) assert( False ) idleMachines = {} for machineName, machine in self.m_clusterNodes.iteritems(): if machine.getPowerState() == PowerState.ON: jobsOnThisMachine = self.getJobsOnMachine( machineName ) if len(jobsOnThisMachine) == 0: idleMachines[ machineName ] = machine return idleMachines def getPendingJobs( self ): return self.m_jobsState.getPendingJobs() def getJobsState( self ): return self.m_jobsState def queueMachineFitsJobRequirements( self, queueMachine, jobRequirements ): if jobRequirements.m_queues: bQueueIsInAllowedQueues = False for queueName in jobRequirements.m_queues: if queueName == queueMachine.getQueueName(): bQueueIsInAllowedQueues = True if not bQueueIsInAllowedQueues: logInfo('queueMachineFitsJobRequirements : queueMachine '+queueMachine.getName()+' rejected because it\'s not in the allowed queues') return False return True def getEnergyConsumption( self ): """ returns an estimate of the energy consumption since the start of the cluster controller (in joules) """ fEnergyConsumption = 0.0 for machine in self.m_clusterNodes.itervalues(): if machine.isReady(): fEnergyConsumption += machine.getEnergyConsumption() return fEnergyConsumption def getEnergySavings( self ): """ returns an estimate of the energy saving since the start of the cluster controller (in joules) """ fEnergySavings = 0.0 for machine in self.m_clusterNodes.itervalues(): if machine.isReady(): fEnergySavings += machine.getEnergySavings() return fEnergySavings def getCurrentPowerConsumption( self ): fPowerConsumption = 0.0 for machine in self.m_clusterNodes.itervalues(): if machine.isReady(): fPowerConsumption += machine.getPowerConsumption() return fPowerConsumption def getCurrentPowerSavings( self ): fPowerSavings = 0.0 for machine in self.m_clusterNodes.itervalues(): if machine.isReady(): fPowerSavings += machine.getPowerConsumptionForPowerState( PowerState.ON ) - machine.getPowerConsumption() return fPowerSavings def getNumControlledSlots( self ): self.m_lock.acquire() iNumControlledSlots = 0 for machine in self.m_clusterNodes.itervalues(): queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) iNumControlledSlots += queueMachine.getNumSlots() self.m_lock.release() return iNumControlledSlots def getNumUsedSlots( self ): self.m_lock.acquire() iNumUsedSlots = 0 for machine in self.m_clusterNodes.itervalues(): queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) iNumUsedSlotsOnThisMachine = queueMachine.getNumSlots() - self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) assert(iNumUsedSlotsOnThisMachine >= 0) iNumUsedSlots += iNumUsedSlotsOnThisMachine self.m_lock.release() return iNumUsedSlots def getNumWastedSlots( self ): self.m_lock.acquire() iNumWastedSlots = 0 for machine in self.m_clusterNodes.itervalues(): if machine.getPowerState() == PowerState.ON: queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) iNumWastedSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) self.m_lock.release() return iNumWastedSlots def getNumSleepingSlots( self ): self.m_lock.acquire() iNumSleepingSlots = 0 for machine in self.m_clusterNodes.itervalues(): if machine.getPowerState() == PowerState.SLEEP: queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) iNumSleepingSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) self.m_lock.release() return iNumSleepingSlots