cocluto/ClusterController/ClusterController.py

304 lines
14 KiB
Python
Raw Normal View History

#!/usr/bin/env python
import sys
sys.path.insert(0, '..')
import os
from Lib.Util import *
from Lib.SimpaDbUtil import *
import time
from ClusterStatus import ClusterStatus
from SlotAllocator import *
from Log import *
from ClusterNodeStatusUpdater import *
from SunGridEngine import SunGridEngine
import Util
from HTMLParser import HTMLParser
VERSION='1.18'
class MyHTMLParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.TokenList = []
def handle_data( self,data):
data = data.strip()
if data and len(data) > 0:
self.TokenList.append(data)
#print data
def GetTokenList(self):
return self.TokenList
class WakeUpCompleteNotifier( IWakeUpCompleteNotifier ):
def __init__(self, machineName, clusterController):
self.m_machineName = machineName
self.m_clusterController = clusterController
def onWakeUpComplete( self ):
logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start')
self.m_clusterController.onMachineWakeUpComplete( self.m_machineName )
class SleepCompleteNotifier( ISleepCompleteNotifier ):
def __init__(self, machineName, clusterController):
self.m_machineName = machineName
self.m_clusterController = clusterController
def onSleepComplete( self, bSleepSucceeded ):
logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start')
self.m_clusterController.onMachineSleepComplete( self.m_machineName, bSleepSucceeded )
def jouleToKwh( fEnergyInJoules ):
"""
converts joules to kWH
"""
# 1 kWh = 1000 * 3600 J
return fEnergyInJoules / (1000.0 * 3600.0)
class ClusterController:
"""
The cluster controller monitors the cluster's activity and has multiple purposes :
- energy saving : it can put some machines to sleep if they have nothing to do, or it
can wake them up when needed (eg when a new job has arrived)
- auto-repair : for examples
- it happened sometimes that sge_execd process disappeared for some unknown reason
in that case, the cluster controller can detect it and restart the daemon
automatically, without administrator's intervention
- clear the Error state of queues
- it could also be used to dynamically adapt sge's settings to the requirements of
jobs (eg add some machines to a queue).
Mechanism to let user get priority
"""
def __init__( self ):
gridEngine = SunGridEngine()
self.m_clusterStatus = ClusterStatus( gridEngine )
self.m_slotAllocator = DecoupledSlotAllocator() #SimpleSlotAllocator()
self.m_machinesThatNeedWakeUp = {}
self.m_machinesThatNeedWakeupLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedWakeUp
self.m_machinesThatNeedSleeping = {}
self.m_machinesThatNeedSleepingLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedSleeping
self.m_lastEnergyStatusLogTime = None
self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds
self.m_iSessionId = None # session (run) identifier in database
def getClusterStatus( self ):
return m_clusterStatus
def log( self, message ):
print message
def shutdownLeastImportantNode( self ):
self.log("ClusterController::shutdownLeastImportantNode : start")
def onMachineWakeUpComplete( self, machineName ):
self.m_machinesThatNeedWakeupLock.acquire()
#logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
del self.m_machinesThatNeedWakeUp[ machineName ]
#logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
self.m_machinesThatNeedWakeupLock.release()
logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName)
def onMachineSleepComplete( self, machineName, bSleepSucceeded ):
self.m_machinesThatNeedSleepingLock.acquire()
#logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
del self.m_machinesThatNeedSleeping[ machineName ]
#logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) )
self.m_machinesThatNeedSleepingLock.release()
if bSleepSucceeded:
logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName)
else:
logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it can\'t be put to sleep at the moment (eg a job just arrived)' % machineName)
def getNumPendingWakeUps( self ):
self.m_machinesThatNeedWakeupLock.acquire()
numPendingWakeUps = len(self.m_machinesThatNeedWakeUp)
self.m_machinesThatNeedWakeupLock.release()
return numPendingWakeUps
def getNumPendingSleeps( self ):
self.m_machinesThatNeedSleepingLock.acquire()
numPendingSleeps = len(self.m_machinesThatNeedSleeping)
self.m_machinesThatNeedSleepingLock.release()
return numPendingSleeps
def putIdleMachinesToSleep( self ):
self.m_clusterStatus.m_lock.acquire()
idleMachines = self.m_clusterStatus.getIdleMachines()
# logInfo('idleMachines :')
self.m_machinesThatNeedToSleep = []
for machineName, idleMachine in idleMachines.iteritems():
if idleMachine.getPowerState() == PowerState.ON:
# logInfo('\t%s' % machineName)
if idleMachine.getName() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things
self.m_machinesThatNeedSleeping[idleMachine.getName()]=idleMachine
self.m_clusterStatus.m_lock.release()
listOfMachinesThatNeedSleeping = self.m_machinesThatNeedSleeping.values() # duplicate the list so that we don't iterate on m_machinesThatNeedSleeping, which could cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
for machine in listOfMachinesThatNeedSleeping:
logInfo('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.getName())
machine.requestSleep( SleepCompleteNotifier( machine.getName(), self ) )
if len(listOfMachinesThatNeedSleeping) != 0:
# hack : wait until the sleep requests are handled so that we don't request the same machine to sleep multiple times
while self.getNumPendingSleeps() > 0:
time.sleep(1)
def wakeUpMachinesForPendingJobs(self):
listOfMachinesThatNeedWakeUp = []
self.m_clusterStatus.m_lock.acquire()
pendingJobs = self.m_clusterStatus.getPendingJobs()
"""
logInfo('pending jobs :')
for job in pendingJobs.itervalues():
logInfo('\t%d' % job.getId().asStr())
"""
if len(pendingJobs) != 0:
self.m_machinesThatNeedWakeUp = self.m_slotAllocator.getMachinesThatNeedWakeUp( pendingJobs, self.m_clusterStatus )
if len(self.m_machinesThatNeedWakeUp) == 0:
None
#logInfo('ClusterController::updateNormalState : no machine needs waking up' )
else:
listOfMachinesThatNeedWakeUp = self.m_machinesThatNeedWakeUp.values() # duplicate the list so that we don't iterate on m_machinesThatNeedWakeUp, which would cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
for machine in listOfMachinesThatNeedWakeUp:
logInfo('ClusterController::wakeUpMachinesForPendingJobs : requesting wake up for '+machine.getName() )
machine.requestWakeUp( WakeUpCompleteNotifier( machine.getName(), self ) )
self.m_clusterStatus.m_lock.release()
if len(listOfMachinesThatNeedWakeUp) != 0:
# hack : wait until the wakeup requests are handled so that a later sleep request doesn't cancel it
# and also wait for the jobs to come in
while self.getNumPendingWakeUps() > 0:
time.sleep(1)
iSGE_CHEK_RUNNABLE_JOBS_DELAY = 60 * 5 # max time it takes for sge between the fact that a queued job is runnable and SGE actually starting it (I've put a long time here because sometimes, qstat takes a long time to ralise that the machine is available after I wake it up)
logInfo('ClusterController::wakeUpMachinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY)
# wait until SGE has a chance to allocate slots
time.sleep(iSGE_CHEK_RUNNABLE_JOBS_DELAY) # note : this is annoying because it blocks the main thread. This could be improved if we forbid the machines to go to sleep for that much time....
logInfo('ClusterController::wakeUpMachinesForPendingJobs : end of the delay given to SGE to allocate slots')
def updateNormalState( self ):
# attempt to shut down machines that are idle
self.putIdleMachinesToSleep()
# wake up necessary machines if there are pending jobs
self.wakeUpMachinesForPendingJobs()
def storeSessionInDatabase( self ):
conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller')
assert(conn)
# retrieve the session id, as it's an auto_increment field
sqlCommand = "SELECT AUTO_INCREMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'clustercontroller' AND TABLE_NAME = 'sessions_desc'"
print sqlCommand
conn.query(sqlCommand)
r=conn.store_result()
iSessionId = r.fetch_row()[0][0]
# stores information about the session
sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.m_clusterStatus.m_clusterNodes))
print sqlCommand
conn.query(sqlCommand)
# initialize the energy savings table
sqlCommand = "INSERT INTO session_to_energy_savings (session_id, energy_savings_kwh) VALUES (%d,0.0);" % (iSessionId)
print sqlCommand
conn.query(sqlCommand)
conn.close()
print( 'Session Iid = %d' % iSessionId )
return iSessionId
def updateSessionEnergyConsumptionInDatabase( self ):
conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller')
assert(conn)
# update energy savings for the current session
sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % ( jouleToKwh(self.m_clusterStatus.getEnergySavings()) ,self.m_iSessionId)
print sqlCommand
conn.query(sqlCommand)
# update the end time of the current session
sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.m_iSessionId)
print sqlCommand
conn.query(sqlCommand)
conn.close()
def run( self ):
"""
"""
self.m_iSessionId = self.storeSessionInDatabase()
DELAY_BETWEEN_MEASURES = 10 # in seconds
self.m_clusterStatus.startReadingThreads()
while not self.m_clusterStatus.isReady():
#log('waiting for system to be ready')
time.sleep(1)
None
logInfo('ClusterController::run : cluster initial readings have completed')
startTime = time.localtime()
while True:
currentTime = time.time()
#clusterStatus.m_nodesStatus['simpatix10'].dump()
if (not self.m_lastEnergyStatusLogTime) or (currentTime > (self.m_lastEnergyStatusLogTime +self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)):
iNumMachines = len(self.m_clusterStatus.m_clusterNodes)
iNumMachinesOn = 0
iNumSleepingMachines = 0
for machine in self.m_clusterStatus.m_clusterNodes.itervalues():
ePowerState = machine.getPowerState()
if ePowerState == PowerState.ON:
iNumMachinesOn+=1
elif ePowerState == PowerState.SLEEP:
iNumSleepingMachines+=1
logInfo('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines))
iNumSlots = self.m_clusterStatus.getNumControlledSlots()
iNumUsedSlots = self.m_clusterStatus.getNumUsedSlots()
iNumWastedSlots = self.m_clusterStatus.getNumWastedSlots()
iNumSleepingSlots = self.m_clusterStatus.getNumSleepingSlots()
logInfo('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots ))
logInfo('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.m_clusterStatus.getCurrentPowerConsumption(), self.m_clusterStatus.getCurrentPowerSavings()) )
logInfo('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.m_clusterStatus.getEnergyConsumption()), jouleToKwh(self.m_clusterStatus.getEnergySavings())))
self.updateSessionEnergyConsumptionInDatabase()
self.m_lastEnergyStatusLogTime = currentTime
self.updateNormalState()
time.sleep(DELAY_BETWEEN_MEASURES)
self.m_clusterStatus.stopReadingThreads()
def storeClusterNodeStatus( clusterNodeStatus ):
#conn = MySQLdb.connect('simpatix10', 'measures_writer', '', 'simpa_measurements')
conn = MySQLdb.connect('simpatix10', 'root', '', 'simpa_measurements')
assert(conn)
#conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""")
'''
conn.query("""SELECT * FROM fan_rpm_logs""")
r=conn.store_result()
print r.fetch_row()[0]
'''
for key, sensor in clusterNodeStatus.m_sensors.iteritems():
sensorId = clusterNodeStatus.m_clusterNodeName + '_' + sensor.m_name
if sensor.typeName() == 'Fan':
sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_rpms)+""", NOW());"""
print sqlCommand
conn.query(sqlCommand)
elif sensor.typeName() == 'Temperature':
sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_temperature)+""", NOW());"""
print sqlCommand
conn.query(sqlCommand)
else:
assert(False)
conn.close()
if __name__ == '__main__':
#Lib.Util.sendTextMail( 'SimpaCluster <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content')
try:
logInfo('ClusterController v. %s starting....' % VERSION)
#executeCommand('ping -o -t 1 simpatix310 > /dev/null')
#print executeCommand('ssh simpatix10 "ipmitool sensor"')
#assert False, 'prout'
controller = ClusterController()
controller.run()
#machineNameToMacAddress( 'simpatix10' )
#except AssertionError, error:
#except KeyboardInterrupt, error:
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
Util.onException(exception)