315 lines
15 KiB
Python
315 lines
15 KiB
Python
#!/usr/bin/env python
|
|
import sys
|
|
sys.path.insert(0, '..')
|
|
import os
|
|
from Lib.Util import *
|
|
from Lib.SimpaDbUtil import *
|
|
import time
|
|
from ClusterStatus import ClusterStatus
|
|
from SlotAllocator import *
|
|
from Log import *
|
|
from ClusterNodeStatusUpdater import *
|
|
from SunGridEngine import SunGridEngine
|
|
import Util
|
|
from WebServer import WebServerThread
|
|
|
|
from HTMLParser import HTMLParser
|
|
|
|
VERSION='1.18'
|
|
|
|
class MyHTMLParser(HTMLParser):
|
|
def __init__(self):
|
|
HTMLParser.__init__(self)
|
|
self.TokenList = []
|
|
def handle_data( self,data):
|
|
data = data.strip()
|
|
if data and len(data) > 0:
|
|
self.TokenList.append(data)
|
|
#print data
|
|
def GetTokenList(self):
|
|
return self.TokenList
|
|
|
|
|
|
class WakeUpCompleteNotifier( IWakeUpCompleteNotifier ):
|
|
def __init__(self, machineName, clusterController):
|
|
self.m_machineName = machineName
|
|
self.m_clusterController = clusterController
|
|
def onWakeUpComplete( self ):
|
|
logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start')
|
|
self.m_clusterController.onMachineWakeUpComplete( self.m_machineName )
|
|
|
|
class SleepCompleteNotifier( ISleepCompleteNotifier ):
|
|
def __init__(self, machineName, clusterController):
|
|
self.m_machineName = machineName
|
|
self.m_clusterController = clusterController
|
|
def onSleepComplete( self, bSleepSucceeded ):
|
|
logDebug('SleepCompleteNotifier::onSleepComplete : start')
|
|
self.m_clusterController.onMachineSleepComplete( self.m_machineName, bSleepSucceeded )
|
|
|
|
def jouleToKwh( fEnergyInJoules ):
|
|
"""
|
|
converts joules to kWH
|
|
"""
|
|
# 1 kWh = 1000 * 3600 J
|
|
return fEnergyInJoules / (1000.0 * 3600.0)
|
|
|
|
class ClusterController:
|
|
"""
|
|
The cluster controller monitors the cluster's activity and has multiple purposes :
|
|
- energy saving : it can put some machines to sleep if they have nothing to do, or it
|
|
can wake them up when needed (eg when a new job has arrived)
|
|
- auto-repair : for examples
|
|
- it happened sometimes that sge_execd process disappeared for some unknown reason
|
|
in that case, the cluster controller can detect it and restart the daemon
|
|
automatically, without administrator's intervention
|
|
- clear the Error state of queues
|
|
- it could also be used to dynamically adapt sge's settings to the requirements of
|
|
jobs (eg add some machines to a queue).
|
|
Mechanism to let user get priority
|
|
"""
|
|
def __init__( self ):
|
|
gridEngine = SunGridEngine()
|
|
self.m_clusterStatus = ClusterStatus( gridEngine )
|
|
self.m_slotAllocator = DecoupledSlotAllocator() #SimpleSlotAllocator()
|
|
self.m_machinesThatNeedWakeUp = {}
|
|
self.m_machinesThatNeedWakeupLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedWakeUp
|
|
self.m_machinesThatNeedSleeping = {}
|
|
self.m_machinesThatNeedSleepingLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedSleeping
|
|
self.m_lastEnergyStatusLogTime = None
|
|
self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds
|
|
self.m_iSessionId = None # session (run) identifier in database
|
|
self.m_webServer = WebServerThread(self)
|
|
self.m_bStop = False
|
|
self.m_bStopLock = threading.Lock() # to prevent concurrent access to m_bStop
|
|
|
|
def getClusterStatus( self ):
|
|
return self.m_clusterStatus
|
|
|
|
def log( self, message ):
|
|
print message
|
|
|
|
def shutdownLeastImportantNode( self ):
|
|
self.log("ClusterController::shutdownLeastImportantNode : start")
|
|
|
|
def onMachineWakeUpComplete( self, machineName ):
|
|
self.m_machinesThatNeedWakeupLock.acquire()
|
|
#logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
|
|
del self.m_machinesThatNeedWakeUp[ machineName ]
|
|
#logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
|
|
self.m_machinesThatNeedWakeupLock.release()
|
|
logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName)
|
|
|
|
def onMachineSleepComplete( self, machineName, bSleepSucceeded ):
|
|
self.m_machinesThatNeedSleepingLock.acquire()
|
|
#logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
|
|
del self.m_machinesThatNeedSleeping[ machineName ]
|
|
#logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
|
|
self.m_machinesThatNeedSleepingLock.release()
|
|
if bSleepSucceeded:
|
|
logDebug('ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it\'s now sleeping' % machineName)
|
|
else:
|
|
logDebug('ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it can\'t be put to sleep at the moment (eg a job just arrived)' % machineName)
|
|
|
|
def getNumPendingWakeUps( self ):
|
|
self.m_machinesThatNeedWakeupLock.acquire()
|
|
numPendingWakeUps = len(self.m_machinesThatNeedWakeUp)
|
|
self.m_machinesThatNeedWakeupLock.release()
|
|
return numPendingWakeUps
|
|
|
|
def getNumPendingSleeps( self ):
|
|
self.m_machinesThatNeedSleepingLock.acquire()
|
|
numPendingSleeps = len(self.m_machinesThatNeedSleeping)
|
|
self.m_machinesThatNeedSleepingLock.release()
|
|
return numPendingSleeps
|
|
|
|
def putIdleMachinesToSleep( self ):
|
|
self.m_clusterStatus.m_lock.acquire()
|
|
idleMachines = self.m_clusterStatus.getIdleMachines()
|
|
# logInfo('idleMachines :')
|
|
self.m_machinesThatNeedToSleep = []
|
|
for machineName, idleMachine in idleMachines.iteritems():
|
|
if idleMachine.getPowerState() == PowerState.ON:
|
|
# logInfo('\t%s' % machineName)
|
|
if idleMachine.getName() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things
|
|
self.m_machinesThatNeedSleeping[idleMachine.getName()]=idleMachine
|
|
self.m_clusterStatus.m_lock.release()
|
|
|
|
listOfMachinesThatNeedSleeping = self.m_machinesThatNeedSleeping.values() # duplicate the list so that we don't iterate on m_machinesThatNeedSleeping, which could cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
|
|
for machine in listOfMachinesThatNeedSleeping:
|
|
logInfo('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.getName())
|
|
machine.requestSleep( SleepCompleteNotifier( machine.getName(), self ) )
|
|
|
|
if len(listOfMachinesThatNeedSleeping) != 0:
|
|
# hack : wait until the sleep requests are handled so that we don't request the same machine to sleep multiple times
|
|
while self.getNumPendingSleeps() > 0:
|
|
time.sleep(1)
|
|
|
|
|
|
def wakeUpMachinesForPendingJobs(self):
|
|
listOfMachinesThatNeedWakeUp = []
|
|
|
|
self.m_clusterStatus.m_lock.acquire()
|
|
pendingJobs = self.m_clusterStatus.getPendingJobs()
|
|
"""
|
|
logInfo('pending jobs :')
|
|
for job in pendingJobs.itervalues():
|
|
logInfo('\t%d' % job.getId().asStr())
|
|
"""
|
|
if len(pendingJobs) != 0:
|
|
self.m_machinesThatNeedWakeUp = self.m_slotAllocator.getMachinesThatNeedWakeUp( pendingJobs, self.m_clusterStatus )
|
|
if len(self.m_machinesThatNeedWakeUp) == 0:
|
|
None
|
|
#logInfo('ClusterController::updateNormalState : no machine needs waking up' )
|
|
else:
|
|
listOfMachinesThatNeedWakeUp = self.m_machinesThatNeedWakeUp.values() # duplicate the list so that we don't iterate on m_machinesThatNeedWakeUp, which would cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
|
|
for machine in listOfMachinesThatNeedWakeUp:
|
|
logInfo('ClusterController::wakeUpMachinesForPendingJobs : requesting wake up for '+machine.getName() )
|
|
machine.requestWakeUp( WakeUpCompleteNotifier( machine.getName(), self ) )
|
|
self.m_clusterStatus.m_lock.release()
|
|
|
|
if len(listOfMachinesThatNeedWakeUp) != 0:
|
|
# hack : wait until the wakeup requests are handled so that a later sleep request doesn't cancel it
|
|
# and also wait for the jobs to come in
|
|
while self.getNumPendingWakeUps() > 0:
|
|
time.sleep(1)
|
|
iSGE_CHEK_RUNNABLE_JOBS_DELAY = 60 * 5 # max time it takes for sge between the fact that a queued job is runnable and SGE actually starting it (I've put a long time here because sometimes, qstat takes a long time to ralise that the machine is available after I wake it up)
|
|
logInfo('ClusterController::wakeUpMachinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY)
|
|
# wait until SGE has a chance to allocate slots
|
|
time.sleep(iSGE_CHEK_RUNNABLE_JOBS_DELAY) # note : this is annoying because it blocks the main thread. This could be improved if we forbid the machines to go to sleep for that much time....
|
|
logInfo('ClusterController::wakeUpMachinesForPendingJobs : end of the delay given to SGE to allocate slots')
|
|
|
|
def updateNormalState( self ):
|
|
# attempt to shut down machines that are idle
|
|
self.putIdleMachinesToSleep()
|
|
# wake up necessary machines if there are pending jobs
|
|
self.wakeUpMachinesForPendingJobs()
|
|
|
|
def storeSessionInDatabase( self ):
|
|
conn = MySQLdb.connect('simpatix10', 'clusterctrl', '', 'clustercontroller')
|
|
assert(conn)
|
|
|
|
# retrieve the session id, as it's an auto_increment field
|
|
sqlCommand = "SELECT AUTO_INCREMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'clustercontroller' AND TABLE_NAME = 'sessions_desc'"
|
|
print sqlCommand
|
|
conn.query(sqlCommand)
|
|
r=conn.store_result()
|
|
iSessionId = r.fetch_row()[0][0]
|
|
|
|
# stores information about the session
|
|
sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.m_clusterStatus.m_clusterNodes))
|
|
print sqlCommand
|
|
conn.query(sqlCommand)
|
|
|
|
# initialize the energy savings table
|
|
sqlCommand = "INSERT INTO session_to_energy_savings (session_id, energy_savings_kwh) VALUES (%d,0.0);" % (iSessionId)
|
|
print sqlCommand
|
|
conn.query(sqlCommand)
|
|
|
|
conn.close()
|
|
print( 'Session Iid = %d' % iSessionId )
|
|
return iSessionId
|
|
|
|
def updateSessionEnergyConsumptionInDatabase( self ):
|
|
conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller')
|
|
assert(conn)
|
|
|
|
# update energy savings for the current session
|
|
sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % ( jouleToKwh(self.m_clusterStatus.getEnergySavings()) ,self.m_iSessionId)
|
|
print sqlCommand
|
|
conn.query(sqlCommand)
|
|
|
|
# update the end time of the current session
|
|
sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.m_iSessionId)
|
|
print sqlCommand
|
|
conn.query(sqlCommand)
|
|
|
|
conn.close()
|
|
|
|
def setControlOnMachine(self, machineName, bControl):
|
|
"""
|
|
adds or removes the control of ClusterController on the given machine
|
|
"""
|
|
self.m_clusterStatus.setControlOnMachine(machineName, bControl)
|
|
|
|
def run( self ):
|
|
"""
|
|
"""
|
|
self.m_iSessionId = self.storeSessionInDatabase()
|
|
log("storeSessionInDatabase completed")
|
|
DELAY_BETWEEN_MEASURES = 10 # in seconds
|
|
self.m_clusterStatus.startReadingThreads()
|
|
self.m_webServer.start()
|
|
while not self.m_clusterStatus.isReady():
|
|
log('waiting for system to be ready')
|
|
time.sleep(1)
|
|
None
|
|
logInfo('ClusterController::run : cluster initial readings have completed')
|
|
startTime = time.localtime()
|
|
while not self.m_bStop:
|
|
currentTime = time.time()
|
|
#clusterStatus.m_nodesStatus['simpatix10'].dump()
|
|
if (not self.m_lastEnergyStatusLogTime) or (currentTime > (self.m_lastEnergyStatusLogTime +self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)):
|
|
iNumMachines = len(self.m_clusterStatus.m_clusterNodes)
|
|
iNumMachinesOn = 0
|
|
iNumSleepingMachines = 0
|
|
for machine in self.m_clusterStatus.m_clusterNodes.itervalues():
|
|
ePowerState = machine.getPowerState()
|
|
if ePowerState == PowerState.ON:
|
|
iNumMachinesOn+=1
|
|
elif ePowerState == PowerState.SLEEP:
|
|
iNumSleepingMachines+=1
|
|
logInfo('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines))
|
|
iNumSlots = self.m_clusterStatus.getNumControlledSlots()
|
|
iNumUsedSlots = self.m_clusterStatus.getNumUsedSlots()
|
|
iNumWastedSlots = self.m_clusterStatus.getNumWastedSlots()
|
|
iNumSleepingSlots = self.m_clusterStatus.getNumSleepingSlots()
|
|
logInfo('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots ))
|
|
logInfo('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.m_clusterStatus.getCurrentPowerConsumption(), self.m_clusterStatus.getCurrentPowerSavings()) )
|
|
logInfo('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.m_clusterStatus.getEnergyConsumption()), jouleToKwh(self.m_clusterStatus.getEnergySavings())))
|
|
self.updateSessionEnergyConsumptionInDatabase()
|
|
self.m_lastEnergyStatusLogTime = currentTime
|
|
|
|
self.updateNormalState()
|
|
time.sleep(DELAY_BETWEEN_MEASURES)
|
|
self.m_clusterStatus.stopReadingThreads()
|
|
|
|
|
|
def storeClusterNodeStatus( clusterNodeStatus ):
|
|
#conn = MySQLdb.connect('simpatix10', 'measures_writer', '', 'simpa_measurements')
|
|
conn = MySQLdb.connect('simpatix10', 'root', '', 'simpa_measurements')
|
|
assert(conn)
|
|
#conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""")
|
|
'''
|
|
conn.query("""SELECT * FROM fan_rpm_logs""")
|
|
r=conn.store_result()
|
|
print r.fetch_row()[0]
|
|
'''
|
|
for key, sensor in clusterNodeStatus.m_sensors.iteritems():
|
|
sensorId = clusterNodeStatus.m_clusterNodeName + '_' + sensor.m_name
|
|
if sensor.typeName() == 'Fan':
|
|
sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_rpms)+""", NOW());"""
|
|
print sqlCommand
|
|
conn.query(sqlCommand)
|
|
elif sensor.typeName() == 'Temperature':
|
|
sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_temperature)+""", NOW());"""
|
|
print sqlCommand
|
|
conn.query(sqlCommand)
|
|
else:
|
|
assert(False)
|
|
conn.close()
|
|
|
|
if __name__ == '__main__':
|
|
#Lib.Util.sendTextMail( 'SimpaCluster <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content')
|
|
try:
|
|
logInfo('ClusterController v. %s starting....' % VERSION)
|
|
#executeCommand('ping -o -t 1 simpatix310 > /dev/null')
|
|
#print executeCommand('ssh simpatix10 "ipmitool sensor"')
|
|
#assert False, 'prout'
|
|
controller = ClusterController()
|
|
controller.run()
|
|
#machineNameToMacAddress( 'simpatix10' )
|
|
#except AssertionError, error:
|
|
#except KeyboardInterrupt, error:
|
|
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
|
|
Util.onException(exception)
|