#!/usr/bin/env python import sys sys.path.insert(0, '..') import os from Lib.Util import * from Lib.SimpaDbUtil import * import time from ClusterStatus import ClusterStatus from SlotAllocator import * from Log import * from ClusterNodeStatusUpdater import * from SunGridEngine import SunGridEngine import Util from HTMLParser import HTMLParser VERSION='1.18' class MyHTMLParser(HTMLParser): def __init__(self): HTMLParser.__init__(self) self.TokenList = [] def handle_data( self,data): data = data.strip() if data and len(data) > 0: self.TokenList.append(data) #print data def GetTokenList(self): return self.TokenList class WakeUpCompleteNotifier( IWakeUpCompleteNotifier ): def __init__(self, machineName, clusterController): self.m_machineName = machineName self.m_clusterController = clusterController def onWakeUpComplete( self ): logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start') self.m_clusterController.onMachineWakeUpComplete( self.m_machineName ) class SleepCompleteNotifier( ISleepCompleteNotifier ): def __init__(self, machineName, clusterController): self.m_machineName = machineName self.m_clusterController = clusterController def onSleepComplete( self, bSleepSucceeded ): logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start') self.m_clusterController.onMachineSleepComplete( self.m_machineName, bSleepSucceeded ) def jouleToKwh( fEnergyInJoules ): """ converts joules to kWH """ # 1 kWh = 1000 * 3600 J return fEnergyInJoules / (1000.0 * 3600.0) class ClusterController: """ The cluster controller monitors the cluster's activity and has multiple purposes : - energy saving : it can put some machines to sleep if they have nothing to do, or it can wake them up when needed (eg when a new job has arrived) - auto-repair : for examples - it happened sometimes that sge_execd process disappeared for some unknown reason in that case, the cluster controller can detect it and restart the daemon automatically, without administrator's intervention - clear the Error state of queues - it could also be used to dynamically adapt sge's settings to the requirements of jobs (eg add some machines to a queue). Mechanism to let user get priority """ def __init__( self ): gridEngine = SunGridEngine() self.m_clusterStatus = ClusterStatus( gridEngine ) self.m_slotAllocator = DecoupledSlotAllocator() #SimpleSlotAllocator() self.m_machinesThatNeedWakeUp = {} self.m_machinesThatNeedWakeupLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedWakeUp self.m_machinesThatNeedSleeping = {} self.m_machinesThatNeedSleepingLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedSleeping self.m_lastEnergyStatusLogTime = None self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds self.m_iSessionId = None # session (run) identifier in database def getClusterStatus( self ): return m_clusterStatus def log( self, message ): print message def shutdownLeastImportantNode( self ): self.log("ClusterController::shutdownLeastImportantNode : start") def onMachineWakeUpComplete( self, machineName ): self.m_machinesThatNeedWakeupLock.acquire() #logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) del self.m_machinesThatNeedWakeUp[ machineName ] #logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) self.m_machinesThatNeedWakeupLock.release() logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName) def onMachineSleepComplete( self, machineName, bSleepSucceeded ): self.m_machinesThatNeedSleepingLock.acquire() #logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) del self.m_machinesThatNeedSleeping[ machineName ] #logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) self.m_machinesThatNeedSleepingLock.release() if bSleepSucceeded: logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName) else: logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it can\'t be put to sleep at the moment (eg a job just arrived)' % machineName) def getNumPendingWakeUps( self ): self.m_machinesThatNeedWakeupLock.acquire() numPendingWakeUps = len(self.m_machinesThatNeedWakeUp) self.m_machinesThatNeedWakeupLock.release() return numPendingWakeUps def getNumPendingSleeps( self ): self.m_machinesThatNeedSleepingLock.acquire() numPendingSleeps = len(self.m_machinesThatNeedSleeping) self.m_machinesThatNeedSleepingLock.release() return numPendingSleeps def putIdleMachinesToSleep( self ): self.m_clusterStatus.m_lock.acquire() idleMachines = self.m_clusterStatus.getIdleMachines() # logInfo('idleMachines :') self.m_machinesThatNeedToSleep = [] for machineName, idleMachine in idleMachines.iteritems(): if idleMachine.getPowerState() == PowerState.ON: # logInfo('\t%s' % machineName) if idleMachine.getName() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things self.m_machinesThatNeedSleeping[idleMachine.getName()]=idleMachine self.m_clusterStatus.m_lock.release() listOfMachinesThatNeedSleeping = self.m_machinesThatNeedSleeping.values() # duplicate the list so that we don't iterate on m_machinesThatNeedSleeping, which could cause a runtime error because callbacks alter m_machinesThatNeedWakeUp for machine in listOfMachinesThatNeedSleeping: logInfo('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.getName()) machine.requestSleep( SleepCompleteNotifier( machine.getName(), self ) ) if len(listOfMachinesThatNeedSleeping) != 0: # hack : wait until the sleep requests are handled so that we don't request the same machine to sleep multiple times while self.getNumPendingSleeps() > 0: time.sleep(1) def wakeUpMachinesForPendingJobs(self): listOfMachinesThatNeedWakeUp = [] self.m_clusterStatus.m_lock.acquire() pendingJobs = self.m_clusterStatus.getPendingJobs() """ logInfo('pending jobs :') for job in pendingJobs.itervalues(): logInfo('\t%d' % job.getId().asStr()) """ if len(pendingJobs) != 0: self.m_machinesThatNeedWakeUp = self.m_slotAllocator.getMachinesThatNeedWakeUp( pendingJobs, self.m_clusterStatus ) if len(self.m_machinesThatNeedWakeUp) == 0: None #logInfo('ClusterController::updateNormalState : no machine needs waking up' ) else: listOfMachinesThatNeedWakeUp = self.m_machinesThatNeedWakeUp.values() # duplicate the list so that we don't iterate on m_machinesThatNeedWakeUp, which would cause a runtime error because callbacks alter m_machinesThatNeedWakeUp for machine in listOfMachinesThatNeedWakeUp: logInfo('ClusterController::wakeUpMachinesForPendingJobs : requesting wake up for '+machine.getName() ) machine.requestWakeUp( WakeUpCompleteNotifier( machine.getName(), self ) ) self.m_clusterStatus.m_lock.release() if len(listOfMachinesThatNeedWakeUp) != 0: # hack : wait until the wakeup requests are handled so that a later sleep request doesn't cancel it # and also wait for the jobs to come in while self.getNumPendingWakeUps() > 0: time.sleep(1) iSGE_CHEK_RUNNABLE_JOBS_DELAY = 60 * 5 # max time it takes for sge between the fact that a queued job is runnable and SGE actually starting it (I've put a long time here because sometimes, qstat takes a long time to ralise that the machine is available after I wake it up) logInfo('ClusterController::wakeUpMachinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY) # wait until SGE has a chance to allocate slots time.sleep(iSGE_CHEK_RUNNABLE_JOBS_DELAY) # note : this is annoying because it blocks the main thread. This could be improved if we forbid the machines to go to sleep for that much time.... logInfo('ClusterController::wakeUpMachinesForPendingJobs : end of the delay given to SGE to allocate slots') def updateNormalState( self ): # attempt to shut down machines that are idle self.putIdleMachinesToSleep() # wake up necessary machines if there are pending jobs self.wakeUpMachinesForPendingJobs() def storeSessionInDatabase( self ): conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller') assert(conn) # retrieve the session id, as it's an auto_increment field sqlCommand = "SELECT AUTO_INCREMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'clustercontroller' AND TABLE_NAME = 'sessions_desc'" print sqlCommand conn.query(sqlCommand) r=conn.store_result() iSessionId = r.fetch_row()[0][0] # stores information about the session sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.m_clusterStatus.m_clusterNodes)) print sqlCommand conn.query(sqlCommand) # initialize the energy savings table sqlCommand = "INSERT INTO session_to_energy_savings (session_id, energy_savings_kwh) VALUES (%d,0.0);" % (iSessionId) print sqlCommand conn.query(sqlCommand) conn.close() print( 'Session Iid = %d' % iSessionId ) return iSessionId def updateSessionEnergyConsumptionInDatabase( self ): conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller') assert(conn) # update energy savings for the current session sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % ( jouleToKwh(self.m_clusterStatus.getEnergySavings()) ,self.m_iSessionId) print sqlCommand conn.query(sqlCommand) # update the end time of the current session sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.m_iSessionId) print sqlCommand conn.query(sqlCommand) conn.close() def run( self ): """ """ self.m_iSessionId = self.storeSessionInDatabase() DELAY_BETWEEN_MEASURES = 10 # in seconds self.m_clusterStatus.startReadingThreads() while not self.m_clusterStatus.isReady(): #log('waiting for system to be ready') time.sleep(1) None logInfo('ClusterController::run : cluster initial readings have completed') startTime = time.localtime() while True: currentTime = time.time() #clusterStatus.m_nodesStatus['simpatix10'].dump() if (not self.m_lastEnergyStatusLogTime) or (currentTime > (self.m_lastEnergyStatusLogTime +self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)): iNumMachines = len(self.m_clusterStatus.m_clusterNodes) iNumMachinesOn = 0 iNumSleepingMachines = 0 for machine in self.m_clusterStatus.m_clusterNodes.itervalues(): ePowerState = machine.getPowerState() if ePowerState == PowerState.ON: iNumMachinesOn+=1 elif ePowerState == PowerState.SLEEP: iNumSleepingMachines+=1 logInfo('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines)) iNumSlots = self.m_clusterStatus.getNumControlledSlots() iNumUsedSlots = self.m_clusterStatus.getNumUsedSlots() iNumWastedSlots = self.m_clusterStatus.getNumWastedSlots() iNumSleepingSlots = self.m_clusterStatus.getNumSleepingSlots() logInfo('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots )) logInfo('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.m_clusterStatus.getCurrentPowerConsumption(), self.m_clusterStatus.getCurrentPowerSavings()) ) logInfo('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.m_clusterStatus.getEnergyConsumption()), jouleToKwh(self.m_clusterStatus.getEnergySavings()))) self.updateSessionEnergyConsumptionInDatabase() self.m_lastEnergyStatusLogTime = currentTime self.updateNormalState() time.sleep(DELAY_BETWEEN_MEASURES) self.m_clusterStatus.stopReadingThreads() def storeClusterNodeStatus( clusterNodeStatus ): #conn = MySQLdb.connect('simpatix10', 'measures_writer', '', 'simpa_measurements') conn = MySQLdb.connect('simpatix10', 'root', '', 'simpa_measurements') assert(conn) #conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""") ''' conn.query("""SELECT * FROM fan_rpm_logs""") r=conn.store_result() print r.fetch_row()[0] ''' for key, sensor in clusterNodeStatus.m_sensors.iteritems(): sensorId = clusterNodeStatus.m_clusterNodeName + '_' + sensor.m_name if sensor.typeName() == 'Fan': sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_rpms)+""", NOW());""" print sqlCommand conn.query(sqlCommand) elif sensor.typeName() == 'Temperature': sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_temperature)+""", NOW());""" print sqlCommand conn.query(sqlCommand) else: assert(False) conn.close() if __name__ == '__main__': #Lib.Util.sendTextMail( 'SimpaCluster ', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content') try: logInfo('ClusterController v. %s starting....' % VERSION) #executeCommand('ping -o -t 1 simpatix310 > /dev/null') #print executeCommand('ssh simpatix10 "ipmitool sensor"') #assert False, 'prout' controller = ClusterController() controller.run() #machineNameToMacAddress( 'simpatix10' ) #except AssertionError, error: #except KeyboardInterrupt, error: except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) Util.onException(exception)