141 lines
5.5 KiB
Python
141 lines
5.5 KiB
Python
|
import threading
|
||
|
from PowerState import *
|
||
|
from ClusterNodeStatusUpdater import *
|
||
|
import Lib.Util
|
||
|
import Lib.SimpaDbUtil
|
||
|
|
||
|
from datetime import *
|
||
|
|
||
|
class ClusterNode:
|
||
|
"""
|
||
|
the state of a machine node
|
||
|
"""
|
||
|
def __init__( self, machineName, cluster, gridEngine ):
|
||
|
self.m_name = machineName
|
||
|
self.m_cluster = cluster # the cluster this machine belongs to
|
||
|
self.m_requestedPowerState = PowerState.ON
|
||
|
self.m_powerState = PowerState.UNKNOWN
|
||
|
self.m_lastPowerStateTime = None # time at which the last value of self.m_powerState has been set
|
||
|
self.m_machineStatusUpdater = ClusterNodeStatusUpdater( machineName, self, gridEngine )
|
||
|
self.m_energyConsumption = 0.0 # estimate of the energy consumption of this machine since the start of cluster controller (in joules)
|
||
|
self.m_energySavings = 0.0 # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules)
|
||
|
|
||
|
def getName( self ):
|
||
|
return self.m_name
|
||
|
|
||
|
def isReady( self ):
|
||
|
if self.m_powerState == PowerState.UNKNOWN:
|
||
|
#logInfo( self.m_name + ' is not ready (waiting for power state)' )
|
||
|
return False
|
||
|
if self.m_powerState == PowerState.ON:
|
||
|
return True
|
||
|
#log( self.m_name + ' is ready' )
|
||
|
return True
|
||
|
|
||
|
def getPowerState( self ):
|
||
|
return self.m_powerState
|
||
|
|
||
|
def setShouldAlwaysBeOn( self ):
|
||
|
self.m_machineStatusUpdater.setShouldAlwaysBeOn( )
|
||
|
self.setPowerState( PowerState.ON )
|
||
|
|
||
|
def setPowerState( self, powerState ):
|
||
|
bUpdateRequiredChecks = False
|
||
|
if self.m_powerState == PowerState.UNKNOWN:
|
||
|
logInfo('ClusterNode::setPowerState : '+self.m_name+'\'s power state has been initialized to '+PowerStateToStr( powerState ))
|
||
|
self.m_powerState = powerState
|
||
|
self.m_lastPowerStateTime = datetime.now()
|
||
|
bUpdateRequiredChecks = True
|
||
|
else:
|
||
|
# update the estimation of energy consumption
|
||
|
self.updateEnergyMeasurements()
|
||
|
# then change the power state
|
||
|
if self.m_powerState != powerState:
|
||
|
logInfo('ClusterNode::setPowerState : '+self.m_name+'\'s power state has been changed to '+PowerStateToStr( powerState ))
|
||
|
self.m_powerState = powerState
|
||
|
self.m_lastPowerStateTime = datetime.now()
|
||
|
bUpdateRequiredChecks = True
|
||
|
if bUpdateRequiredChecks:
|
||
|
if self.m_powerState == PowerState.ON:
|
||
|
self.m_machineStatusUpdater.m_bCheckPowerState = True
|
||
|
self.m_machineStatusUpdater.m_bCheckSensors = True
|
||
|
elif self.m_powerState == PowerState.OFF:
|
||
|
self.m_machineStatusUpdater.m_bCheckPowerState = True
|
||
|
self.m_machineStatusUpdater.m_bCheckSensors = False
|
||
|
elif self.m_powerState == PowerState.SLEEP:
|
||
|
self.m_machineStatusUpdater.m_bCheckPowerState = True
|
||
|
self.m_machineStatusUpdater.m_bCheckSensors = False
|
||
|
elif self.m_powerState == PowerState.UNPLUGGED:
|
||
|
self.m_machineStatusUpdater.m_bCheckPowerState = True
|
||
|
self.m_machineStatusUpdater.m_bCheckSensors = False
|
||
|
else:
|
||
|
assert( False )
|
||
|
|
||
|
def onNewPowerStateReading( self, powerState ):
|
||
|
"""
|
||
|
called when a new powerstate reading arrives
|
||
|
"""
|
||
|
if powerState != self.getPowerState():
|
||
|
if self.getPowerState() != PowerState.UNKNOWN:
|
||
|
logWarning('ClusterNode::onNewPowerStateReading : '+self.m_name+'\'s power state has been (manually it seems) changed to '+PowerStateToStr( powerState ))
|
||
|
self.setPowerState( powerState )
|
||
|
|
||
|
def getPowerConsumptionForPowerState( self, ePowerState ):
|
||
|
"""
|
||
|
returns the power consumption estimation (in watts) of this machine for the given power state
|
||
|
"""
|
||
|
fCurrentIntensity = 0.0
|
||
|
fCurrentVoltage = 220.0
|
||
|
# noticed on 26.08.2009 that putting 22 machines from sleep to on eats 17 A, resulting in difference of 0.77 A per machine
|
||
|
if ePowerState == PowerState.ON:
|
||
|
fCurrentIntensity = 0.9 # value when the machine is doing nothing
|
||
|
elif ePowerState == PowerState.OFF:
|
||
|
fCurrentIntensity = 0.1
|
||
|
elif ePowerState == PowerState.SLEEP:
|
||
|
fCurrentIntensity = 0.1
|
||
|
elif ePowerState == PowerState.UNPLUGGED:
|
||
|
fCurrentIntensity = 0.0
|
||
|
else:
|
||
|
assert(False)
|
||
|
return fCurrentIntensity * fCurrentVoltage
|
||
|
|
||
|
def updateEnergyMeasurements( self ):
|
||
|
timeInterval = datetime.now() - self.m_lastPowerStateTime
|
||
|
self.m_energyConsumption += self.getPowerConsumptionForPowerState( self.m_powerState ) * timeInterval.seconds
|
||
|
self.m_energySavings += ( self.getPowerConsumptionForPowerState( PowerState.ON ) - self.getPowerConsumptionForPowerState( self.m_powerState ) ) * timeInterval.seconds
|
||
|
self.m_lastPowerStateTime = datetime.now()
|
||
|
#logDebug('energy savings on %s : %f J' %(self.getName(), self.m_energySavings))
|
||
|
|
||
|
def getEnergyConsumption( self ):
|
||
|
"""
|
||
|
in joules
|
||
|
"""
|
||
|
self.updateEnergyMeasurements()
|
||
|
return self.m_energyConsumption
|
||
|
|
||
|
def getPowerConsumption( self ):
|
||
|
fCurrentPowerConsumption = self.getPowerConsumptionForPowerState( self.m_powerState )
|
||
|
#logDebug('getPowerConsumption of %s : %f (powerstate = %d)' % (self.getName(), fCurrentPowerConsumption, self.m_powerState))
|
||
|
return fCurrentPowerConsumption
|
||
|
|
||
|
def getEnergySavings( self ):
|
||
|
self.updateEnergyMeasurements()
|
||
|
return self.m_energySavings
|
||
|
|
||
|
def onSleepFailedBecauseAJobJustArrived( self ):
|
||
|
logInfo('%s was scheduled to sleep but the sleep is canceled because it\'s currently executing a new job' % self.m_name)
|
||
|
|
||
|
def requestSleep( self, sleepCompleteNotifier = None ):
|
||
|
self.m_machineStatusUpdater.requestSleep( sleepCompleteNotifier )
|
||
|
|
||
|
def requestWakeUp( self, wakeUpCompleteNotifier = None ):
|
||
|
self.m_machineStatusUpdater.requestWakeUp( wakeUpCompleteNotifier )
|
||
|
|
||
|
def getQueueMachineName( self ):
|
||
|
return self.getCluster().getJobsState().getQueueMachine( self.m_name ).getName()
|
||
|
assert( self.m_queueName != None )
|
||
|
return self.m_queueName
|
||
|
|
||
|
def getCluster( self ):
|
||
|
return self.m_cluster
|