2011-10-07 17:43:45 +02:00
#!/usr/bin/env python
import sys
sys . path . insert ( 0 , ' .. ' )
import os
from Lib . Util import *
from Lib . SimpaDbUtil import *
import time
from ClusterStatus import ClusterStatus
from SlotAllocator import *
from Log import *
from ClusterNodeStatusUpdater import *
from SunGridEngine import SunGridEngine
import Util
2012-06-26 18:08:36 +02:00
from WebServer import WebServerThread
2011-10-07 17:43:45 +02:00
from HTMLParser import HTMLParser
VERSION = ' 1.18 '
class MyHTMLParser ( HTMLParser ) :
def __init__ ( self ) :
HTMLParser . __init__ ( self )
self . TokenList = [ ]
def handle_data ( self , data ) :
data = data . strip ( )
if data and len ( data ) > 0 :
self . TokenList . append ( data )
#print data
def GetTokenList ( self ) :
return self . TokenList
class WakeUpCompleteNotifier ( IWakeUpCompleteNotifier ) :
def __init__ ( self , machineName , clusterController ) :
self . m_machineName = machineName
self . m_clusterController = clusterController
def onWakeUpComplete ( self ) :
logDebug ( ' WakeUpCompleteNotifier::onWakeUpComplete : start ' )
self . m_clusterController . onMachineWakeUpComplete ( self . m_machineName )
class SleepCompleteNotifier ( ISleepCompleteNotifier ) :
def __init__ ( self , machineName , clusterController ) :
self . m_machineName = machineName
self . m_clusterController = clusterController
def onSleepComplete ( self , bSleepSucceeded ) :
2012-07-03 16:24:41 +02:00
logDebug ( ' SleepCompleteNotifier::onSleepComplete : start ' )
2011-10-07 17:43:45 +02:00
self . m_clusterController . onMachineSleepComplete ( self . m_machineName , bSleepSucceeded )
def jouleToKwh ( fEnergyInJoules ) :
"""
converts joules to kWH
"""
# 1 kWh = 1000 * 3600 J
return fEnergyInJoules / ( 1000.0 * 3600.0 )
class ClusterController :
"""
The cluster controller monitors the cluster ' s activity and has multiple purposes :
- energy saving : it can put some machines to sleep if they have nothing to do , or it
can wake them up when needed ( eg when a new job has arrived )
- auto - repair : for examples
- it happened sometimes that sge_execd process disappeared for some unknown reason
in that case , the cluster controller can detect it and restart the daemon
automatically , without administrator ' s intervention
- clear the Error state of queues
- it could also be used to dynamically adapt sge ' s settings to the requirements of
jobs ( eg add some machines to a queue ) .
Mechanism to let user get priority
"""
def __init__ ( self ) :
gridEngine = SunGridEngine ( )
self . m_clusterStatus = ClusterStatus ( gridEngine )
self . m_slotAllocator = DecoupledSlotAllocator ( ) #SimpleSlotAllocator()
self . m_machinesThatNeedWakeUp = { }
self . m_machinesThatNeedWakeupLock = threading . Lock ( ) # to prevent concurrent access to m_machinesThatNeedWakeUp
self . m_machinesThatNeedSleeping = { }
self . m_machinesThatNeedSleepingLock = threading . Lock ( ) # to prevent concurrent access to m_machinesThatNeedSleeping
self . m_lastEnergyStatusLogTime = None
self . DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds
self . m_iSessionId = None # session (run) identifier in database
2012-06-26 18:08:36 +02:00
self . m_webServer = WebServerThread ( self )
2012-07-03 16:24:41 +02:00
self . m_bStop = False
self . m_bStopLock = threading . Lock ( ) # to prevent concurrent access to m_bStop
2011-10-07 17:43:45 +02:00
def getClusterStatus ( self ) :
2012-06-26 18:08:36 +02:00
return self . m_clusterStatus
2011-10-07 17:43:45 +02:00
def log ( self , message ) :
print message
def shutdownLeastImportantNode ( self ) :
self . log ( " ClusterController::shutdownLeastImportantNode : start " )
def onMachineWakeUpComplete ( self , machineName ) :
self . m_machinesThatNeedWakeupLock . acquire ( )
#logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
del self . m_machinesThatNeedWakeUp [ machineName ]
#logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
self . m_machinesThatNeedWakeupLock . release ( )
logDebug ( ' ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it \' s now awake ' % machineName )
def onMachineSleepComplete ( self , machineName , bSleepSucceeded ) :
self . m_machinesThatNeedSleepingLock . acquire ( )
#logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
del self . m_machinesThatNeedSleeping [ machineName ]
#logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
self . m_machinesThatNeedSleepingLock . release ( )
if bSleepSucceeded :
2012-07-03 16:24:41 +02:00
logDebug ( ' ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it \' s now sleeping ' % machineName )
2011-10-07 17:43:45 +02:00
else :
2012-07-03 16:24:41 +02:00
logDebug ( ' ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it can \' t be put to sleep at the moment (eg a job just arrived) ' % machineName )
2011-10-07 17:43:45 +02:00
def getNumPendingWakeUps ( self ) :
self . m_machinesThatNeedWakeupLock . acquire ( )
numPendingWakeUps = len ( self . m_machinesThatNeedWakeUp )
self . m_machinesThatNeedWakeupLock . release ( )
return numPendingWakeUps
def getNumPendingSleeps ( self ) :
self . m_machinesThatNeedSleepingLock . acquire ( )
numPendingSleeps = len ( self . m_machinesThatNeedSleeping )
self . m_machinesThatNeedSleepingLock . release ( )
return numPendingSleeps
def putIdleMachinesToSleep ( self ) :
self . m_clusterStatus . m_lock . acquire ( )
idleMachines = self . m_clusterStatus . getIdleMachines ( )
# logInfo('idleMachines :')
self . m_machinesThatNeedToSleep = [ ]
for machineName , idleMachine in idleMachines . iteritems ( ) :
if idleMachine . getPowerState ( ) == PowerState . ON :
# logInfo('\t%s' % machineName)
if idleMachine . getName ( ) != ' simpatix10 ' : # never put simpatix10 to sleep because it's the sge master and is also server for other things
self . m_machinesThatNeedSleeping [ idleMachine . getName ( ) ] = idleMachine
self . m_clusterStatus . m_lock . release ( )
listOfMachinesThatNeedSleeping = self . m_machinesThatNeedSleeping . values ( ) # duplicate the list so that we don't iterate on m_machinesThatNeedSleeping, which could cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
for machine in listOfMachinesThatNeedSleeping :
logInfo ( ' ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it \' s idle ' % machine . getName ( ) )
machine . requestSleep ( SleepCompleteNotifier ( machine . getName ( ) , self ) )
if len ( listOfMachinesThatNeedSleeping ) != 0 :
# hack : wait until the sleep requests are handled so that we don't request the same machine to sleep multiple times
while self . getNumPendingSleeps ( ) > 0 :
time . sleep ( 1 )
def wakeUpMachinesForPendingJobs ( self ) :
listOfMachinesThatNeedWakeUp = [ ]
self . m_clusterStatus . m_lock . acquire ( )
pendingJobs = self . m_clusterStatus . getPendingJobs ( )
"""
logInfo ( ' pending jobs : ' )
for job in pendingJobs . itervalues ( ) :
logInfo ( ' \t %d ' % job . getId ( ) . asStr ( ) )
"""
if len ( pendingJobs ) != 0 :
self . m_machinesThatNeedWakeUp = self . m_slotAllocator . getMachinesThatNeedWakeUp ( pendingJobs , self . m_clusterStatus )
if len ( self . m_machinesThatNeedWakeUp ) == 0 :
None
#logInfo('ClusterController::updateNormalState : no machine needs waking up' )
else :
listOfMachinesThatNeedWakeUp = self . m_machinesThatNeedWakeUp . values ( ) # duplicate the list so that we don't iterate on m_machinesThatNeedWakeUp, which would cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
for machine in listOfMachinesThatNeedWakeUp :
logInfo ( ' ClusterController::wakeUpMachinesForPendingJobs : requesting wake up for ' + machine . getName ( ) )
machine . requestWakeUp ( WakeUpCompleteNotifier ( machine . getName ( ) , self ) )
self . m_clusterStatus . m_lock . release ( )
if len ( listOfMachinesThatNeedWakeUp ) != 0 :
# hack : wait until the wakeup requests are handled so that a later sleep request doesn't cancel it
# and also wait for the jobs to come in
while self . getNumPendingWakeUps ( ) > 0 :
time . sleep ( 1 )
iSGE_CHEK_RUNNABLE_JOBS_DELAY = 60 * 5 # max time it takes for sge between the fact that a queued job is runnable and SGE actually starting it (I've put a long time here because sometimes, qstat takes a long time to ralise that the machine is available after I wake it up)
logInfo ( ' ClusterController::wakeUpMachinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots. ' % iSGE_CHEK_RUNNABLE_JOBS_DELAY )
# wait until SGE has a chance to allocate slots
time . sleep ( iSGE_CHEK_RUNNABLE_JOBS_DELAY ) # note : this is annoying because it blocks the main thread. This could be improved if we forbid the machines to go to sleep for that much time....
logInfo ( ' ClusterController::wakeUpMachinesForPendingJobs : end of the delay given to SGE to allocate slots ' )
def updateNormalState ( self ) :
# attempt to shut down machines that are idle
self . putIdleMachinesToSleep ( )
# wake up necessary machines if there are pending jobs
self . wakeUpMachinesForPendingJobs ( )
def storeSessionInDatabase ( self ) :
2011-10-07 17:51:20 +02:00
conn = MySQLdb . connect ( ' simpatix10 ' , ' clusterctrl ' , ' ' , ' clustercontroller ' )
2011-10-07 17:43:45 +02:00
assert ( conn )
# retrieve the session id, as it's an auto_increment field
sqlCommand = " SELECT AUTO_INCREMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = ' clustercontroller ' AND TABLE_NAME = ' sessions_desc ' "
print sqlCommand
conn . query ( sqlCommand )
r = conn . store_result ( )
iSessionId = r . fetch_row ( ) [ 0 ] [ 0 ]
# stores information about the session
sqlCommand = " INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), ' %s ' , ' simpatix10 ' , %d , %d ); " % ( VERSION , os . getpid ( ) , len ( self . m_clusterStatus . m_clusterNodes ) )
print sqlCommand
conn . query ( sqlCommand )
# initialize the energy savings table
sqlCommand = " INSERT INTO session_to_energy_savings (session_id, energy_savings_kwh) VALUES ( %d ,0.0); " % ( iSessionId )
print sqlCommand
conn . query ( sqlCommand )
conn . close ( )
print ( ' Session Iid = %d ' % iSessionId )
return iSessionId
def updateSessionEnergyConsumptionInDatabase ( self ) :
conn = MySQLdb . connect ( ' simpatix10 ' , ' root ' , ' ' , ' clustercontroller ' )
assert ( conn )
# update energy savings for the current session
sqlCommand = " UPDATE session_to_energy_savings SET energy_savings_kwh= %f WHERE session_id= %d ; " % ( jouleToKwh ( self . m_clusterStatus . getEnergySavings ( ) ) , self . m_iSessionId )
print sqlCommand
conn . query ( sqlCommand )
# update the end time of the current session
sqlCommand = " UPDATE sessions_desc SET end_time=NOW() WHERE session_id= %d ; " % ( self . m_iSessionId )
print sqlCommand
conn . query ( sqlCommand )
conn . close ( )
2012-06-26 18:08:36 +02:00
def setControlOnMachine ( self , machineName , bControl ) :
"""
adds or removes the control of ClusterController on the given machine
"""
self . m_clusterStatus . setControlOnMachine ( machineName , bControl )
2011-10-07 17:43:45 +02:00
def run ( self ) :
"""
"""
self . m_iSessionId = self . storeSessionInDatabase ( )
2011-10-07 17:51:20 +02:00
log ( " storeSessionInDatabase completed " )
2011-10-07 17:43:45 +02:00
DELAY_BETWEEN_MEASURES = 10 # in seconds
self . m_clusterStatus . startReadingThreads ( )
2012-06-26 18:08:36 +02:00
self . m_webServer . start ( )
2011-10-07 17:43:45 +02:00
while not self . m_clusterStatus . isReady ( ) :
2011-10-07 17:51:20 +02:00
log ( ' waiting for system to be ready ' )
2011-10-07 17:43:45 +02:00
time . sleep ( 1 )
None
logInfo ( ' ClusterController::run : cluster initial readings have completed ' )
startTime = time . localtime ( )
2012-07-03 16:24:41 +02:00
while not self . m_bStop :
2011-10-07 17:43:45 +02:00
currentTime = time . time ( )
#clusterStatus.m_nodesStatus['simpatix10'].dump()
if ( not self . m_lastEnergyStatusLogTime ) or ( currentTime > ( self . m_lastEnergyStatusLogTime + self . DELAY_BETWEEN_ENERGY_STATUS_LOGS ) ) :
iNumMachines = len ( self . m_clusterStatus . m_clusterNodes )
iNumMachinesOn = 0
iNumSleepingMachines = 0
for machine in self . m_clusterStatus . m_clusterNodes . itervalues ( ) :
ePowerState = machine . getPowerState ( )
if ePowerState == PowerState . ON :
iNumMachinesOn + = 1
elif ePowerState == PowerState . SLEEP :
iNumSleepingMachines + = 1
logInfo ( ' %d machines ( %d ON, %d SLEEPING) ' % ( iNumMachines , iNumMachinesOn , iNumSleepingMachines ) )
iNumSlots = self . m_clusterStatus . getNumControlledSlots ( )
iNumUsedSlots = self . m_clusterStatus . getNumUsedSlots ( )
iNumWastedSlots = self . m_clusterStatus . getNumWastedSlots ( )
iNumSleepingSlots = self . m_clusterStatus . getNumSleepingSlots ( )
logInfo ( ' %d slots ( %d used, %d wasted, %d sleeping) ' % ( iNumSlots , iNumUsedSlots , iNumWastedSlots , iNumSleepingSlots ) )
logInfo ( ' cluster estimated power consumption : %f W (saving from cluster controller : %f W) ' % ( self . m_clusterStatus . getCurrentPowerConsumption ( ) , self . m_clusterStatus . getCurrentPowerSavings ( ) ) )
logInfo ( ' cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh) ' % ( time . asctime ( startTime ) , jouleToKwh ( self . m_clusterStatus . getEnergyConsumption ( ) ) , jouleToKwh ( self . m_clusterStatus . getEnergySavings ( ) ) ) )
self . updateSessionEnergyConsumptionInDatabase ( )
self . m_lastEnergyStatusLogTime = currentTime
self . updateNormalState ( )
time . sleep ( DELAY_BETWEEN_MEASURES )
self . m_clusterStatus . stopReadingThreads ( )
def storeClusterNodeStatus ( clusterNodeStatus ) :
#conn = MySQLdb.connect('simpatix10', 'measures_writer', '', 'simpa_measurements')
conn = MySQLdb . connect ( ' simpatix10 ' , ' root ' , ' ' , ' simpa_measurements ' )
assert ( conn )
#conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""")
'''
conn . query ( """ SELECT * FROM fan_rpm_logs """ )
r = conn . store_result ( )
print r . fetch_row ( ) [ 0 ]
'''
for key , sensor in clusterNodeStatus . m_sensors . iteritems ( ) :
sensorId = clusterNodeStatus . m_clusterNodeName + ' _ ' + sensor . m_name
if sensor . typeName ( ) == ' Fan ' :
sqlCommand = """ INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ( ' """ + sensorId + """ ' , """ + str ( sensor . m_rpms ) + """ , NOW()); """
print sqlCommand
conn . query ( sqlCommand )
elif sensor . typeName ( ) == ' Temperature ' :
sqlCommand = """ INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ( ' """ + sensorId + """ ' , """ + str ( sensor . m_temperature ) + """ , NOW()); """
print sqlCommand
conn . query ( sqlCommand )
else :
assert ( False )
conn . close ( )
if __name__ == ' __main__ ' :
#Lib.Util.sendTextMail( 'SimpaCluster <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content')
try :
logInfo ( ' ClusterController v. %s starting.... ' % VERSION )
#executeCommand('ping -o -t 1 simpatix310 > /dev/null')
#print executeCommand('ssh simpatix10 "ipmitool sensor"')
#assert False, 'prout'
controller = ClusterController ( )
controller . run ( )
#machineNameToMacAddress( 'simpatix10' )
#except AssertionError, error:
#except KeyboardInterrupt, error:
except BaseException , exception : # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
Util . onException ( exception )