cocluto/ClusterController/ClusterStatus.py

185 lines
6.6 KiB
Python
Raw Normal View History

import threading
from JobsStateUpdater import *
import Lib.Util
import Lib.SimpaDbUtil
from ClusterNode import *
import time
class ClusterStatus:
"""
The current state (jobs, sensors) of the cluster
@param gridEngine the interface to the batch job tool (in our case it's sun grid engine)
"""
def __init__(self, gridEngine):
self.m_gridEngine = gridEngine
self.m_clusterNodes = {}
self.m_lock = threading.Lock() # to prevent concurrent access to this instance
self.m_jobsStateUpdater = JobsStateUpdater( self )
self.m_jobsState = None
self.m_controlledMachineNames = [ 'simpatix30' ]
#self.m_controlledMachineNames = [] # [ 'simpatix30' ]
if False:
for iMachine in range(11, 40):
if (iMachine == 31) or (iMachine == 32):
continue # these machines don't seem to be able to go to sleep properly (bug 00000010)
if (iMachine == 18):
continue # this machine needs maintenance (restarting because it's very slow for an unknown reason)
self.m_controlledMachineNames.append( 'simpatix%d' % iMachine )
nodeNames = Lib.SimpaDbUtil.getClusterMachinesNames()
for nodeName in nodeNames:
if nodeName in self.m_controlledMachineNames:
logInfo( 'machine %s is under the cluster controller\'s control' % nodeName )
clusterNode = ClusterNode( nodeName, self, gridEngine )
if nodeName == 'simpatix10':
clusterNode.setShouldAlwaysBeOn()
self.m_clusterNodes[ nodeName ] = clusterNode
return
def getGridEngine( self ):
return self.m_gridEngine
def getMachines( self ):
return self.m_clusterNodes
def startReadingThreads( self ):
for k, v in self.m_clusterNodes.iteritems():
v.m_machineStatusUpdater.start()
self.m_jobsStateUpdater.start()
def stopReadingThreads( self ):
for k, v in self.m_clusterNodes.iteritems():
v.m_machineStatusUpdater.m_bStop = True
v.m_machineStatusUpdater.join()
self.m_jobsStateUpdater.m_bStop = True
self.m_jobsStateUpdater.join()
def onNewJobsState( self, newJobsState ):
#logDebug( 'ClusterStatus::onNewJobsState : attempting to acquire lock to access m_jobsState' )
self.m_lock.acquire()
#logDebug( 'ClusterStatus::onNewJobsState : got lock to access m_jobsState' )
self.m_jobsState = newJobsState
self.m_lock.release()
def getJobsOnMachine( self, machineName ):
return self.m_jobsState.getJobsOnMachine( machineName )
def isReady( self ):
for k, v in self.m_clusterNodes.iteritems():
if not v.isReady():
logInfo( 'ClusterStatus::isReady : not ready because of ' + v.getName() )
return False
#log('ClusterStatus::isReady() : '+k+' is ready')
#assert( False )
if self.m_jobsState == None:
logInfo( 'ClusterStatus::isReady : not ready because waiting for jobs state' )
return False
return True
def getIdleMachines( self ):
assert( self.isReady )
bBUG_00000009_IS_STILL_ALIVE = True
if bBUG_00000009_IS_STILL_ALIVE:
currentTime = time.time()
fJOBS_STATE_MAX_ALLOWED_AGE = 3600
fJobsStateAge = currentTime - self.m_jobsState.getTime()
if fJobsStateAge > fJOBS_STATE_MAX_ALLOWED_AGE:
logError('ClusterStatus::getIdleMachines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge))
assert( False )
idleMachines = {}
for machineName, machine in self.m_clusterNodes.iteritems():
if machine.getPowerState() == PowerState.ON:
jobsOnThisMachine = self.getJobsOnMachine( machineName )
if len(jobsOnThisMachine) == 0:
idleMachines[ machineName ] = machine
return idleMachines
def getPendingJobs( self ):
return self.m_jobsState.getPendingJobs()
def getJobsState( self ):
return self.m_jobsState
def queueMachineFitsJobRequirements( self, queueMachine, jobRequirements ):
if jobRequirements.m_queues:
bQueueIsInAllowedQueues = False
for queueName in jobRequirements.m_queues:
if queueName == queueMachine.getQueueName():
bQueueIsInAllowedQueues = True
if not bQueueIsInAllowedQueues:
logInfo('queueMachineFitsJobRequirements : queueMachine '+queueMachine.getName()+' rejected because it\'s not in the allowed queues')
return False
return True
def getEnergyConsumption( self ):
"""
returns an estimate of the energy consumption since the start of the cluster controller (in joules)
"""
fEnergyConsumption = 0.0
for machine in self.m_clusterNodes.itervalues():
fEnergyConsumption += machine.getEnergyConsumption()
return fEnergyConsumption
def getEnergySavings( self ):
"""
returns an estimate of the energy saving since the start of the cluster controller (in joules)
"""
fEnergySavings = 0.0
for machine in self.m_clusterNodes.itervalues():
fEnergySavings += machine.getEnergySavings()
return fEnergySavings
def getCurrentPowerConsumption( self ):
fPowerConsumption = 0.0
for machine in self.m_clusterNodes.itervalues():
fPowerConsumption += machine.getPowerConsumption()
return fPowerConsumption
def getCurrentPowerSavings( self ):
fPowerSavings = 0.0
for machine in self.m_clusterNodes.itervalues():
fPowerSavings += machine.getPowerConsumptionForPowerState( PowerState.ON ) - machine.getPowerConsumption()
return fPowerSavings
def getNumControlledSlots( self ):
self.m_lock.acquire()
iNumControlledSlots = 0
for machine in self.m_clusterNodes.itervalues():
queueMachine = self.m_jobsState.getQueueMachine( machine.getName() )
iNumControlledSlots += queueMachine.getNumSlots()
self.m_lock.release()
return iNumControlledSlots
def getNumUsedSlots( self ):
self.m_lock.acquire()
iNumUsedSlots = 0
for machine in self.m_clusterNodes.itervalues():
queueMachine = self.m_jobsState.getQueueMachine( machine.getName() )
iNumUsedSlotsOnThisMachine = queueMachine.getNumSlots() - self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
assert(iNumUsedSlotsOnThisMachine >= 0)
iNumUsedSlots += iNumUsedSlotsOnThisMachine
self.m_lock.release()
return iNumUsedSlots
def getNumWastedSlots( self ):
self.m_lock.acquire()
iNumWastedSlots = 0
for machine in self.m_clusterNodes.itervalues():
if machine.getPowerState() == PowerState.ON:
queueMachine = self.m_jobsState.getQueueMachine( machine.getName() )
iNumWastedSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
self.m_lock.release()
return iNumWastedSlots
def getNumSleepingSlots( self ):
self.m_lock.acquire()
iNumSleepingSlots = 0
for machine in self.m_clusterNodes.itervalues():
if machine.getPowerState() == PowerState.SLEEP:
queueMachine = self.m_jobsState.getQueueMachine( machine.getName() )
iNumSleepingSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
self.m_lock.release()
return iNumSleepingSlots