From 23b96654341a141e0e70681ff9237ca088956a5b Mon Sep 17 00:00:00 2001 From: Guillaume Raffy Date: Fri, 7 Oct 2011 15:43:45 +0000 Subject: [PATCH] =?UTF-8?q?Ai=20remis=20le=20cluster=20controller=20en=20r?= =?UTF-8?q?oute.=20Il=20n'est=20pas=20si=20simple=20que=20=C3=A7a=20=C3=A0?= =?UTF-8?q?=20d=C3=A9marrer,=20alors=20j'en=20ai=20profit=C3=A9=20pour=20?= =?UTF-8?q?=C3=A9crire=20l'installeur,=20qui=20tient=20=C3=A9galement=20li?= =?UTF-8?q?eu=20de=20documentation.=20(partie=201)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ClusterController/ClusterController.plist | 25 ++ ClusterController/ClusterNode.py | 140 ++++++++ ClusterController/ClusterNodeStatusUpdater.py | 188 +++++++++++ ClusterController/ClusterStatus.py | 185 +++++++++++ ClusterController/Install.py | 42 +++ ClusterController/Job.py | 102 ++++++ ClusterController/JobsState.py | 71 ++++ ClusterController/JobsStateUpdater.py | 35 ++ ClusterController/Log.py | 30 ++ ClusterController/Main.py | 303 ++++++++++++++++++ ClusterController/PowerState.py | 21 ++ ClusterController/QstatParser.py | 185 +++++++++++ ClusterController/QueueMachine.py | 25 ++ ClusterController/SlotAllocator.py | 141 ++++++++ ClusterController/SunGridEngine.py | 48 +++ ClusterController/TestBug00000003.py | 53 +++ ClusterController/Util.py | 214 +++++++++++++ ClusterController/__init__.py | 1 + 18 files changed, 1809 insertions(+) create mode 100644 ClusterController/ClusterController.plist create mode 100644 ClusterController/ClusterNode.py create mode 100644 ClusterController/ClusterNodeStatusUpdater.py create mode 100644 ClusterController/ClusterStatus.py create mode 100755 ClusterController/Install.py create mode 100644 ClusterController/Job.py create mode 100644 ClusterController/JobsState.py create mode 100644 ClusterController/JobsStateUpdater.py create mode 100644 ClusterController/Log.py create mode 100644 ClusterController/Main.py create mode 100644 ClusterController/PowerState.py create mode 100644 ClusterController/QstatParser.py create mode 100644 ClusterController/QueueMachine.py create mode 100644 ClusterController/SlotAllocator.py create mode 100644 ClusterController/SunGridEngine.py create mode 100755 ClusterController/TestBug00000003.py create mode 100644 ClusterController/Util.py create mode 100644 ClusterController/__init__.py diff --git a/ClusterController/ClusterController.plist b/ClusterController/ClusterController.plist new file mode 100644 index 0000000..459c34a --- /dev/null +++ b/ClusterController/ClusterController.plist @@ -0,0 +1,25 @@ + + + + + Label + fr.univ-rennes1.ipr.ClusterController + + UserName + root + WorkingDirectory + /usr/local/bin/ipr/Python/ClusterController + ProgramArguments + + ./ClusterControllerLauncher.sh + + StandardOutPath + /var/log/ClusterController.stdout + StandardErrorPath + /var/log/ClusterController.stderr + + diff --git a/ClusterController/ClusterNode.py b/ClusterController/ClusterNode.py new file mode 100644 index 0000000..c51d9e2 --- /dev/null +++ b/ClusterController/ClusterNode.py @@ -0,0 +1,140 @@ +import threading +from PowerState import * +from ClusterNodeStatusUpdater import * +import Lib.Util +import Lib.SimpaDbUtil + +from datetime import * + +class ClusterNode: + """ + the state of a machine node + """ + def __init__( self, machineName, cluster, gridEngine ): + self.m_name = machineName + self.m_cluster = cluster # the cluster this machine belongs to + self.m_requestedPowerState = PowerState.ON + self.m_powerState = PowerState.UNKNOWN + self.m_lastPowerStateTime = None # time at which the last value of self.m_powerState has been set + self.m_machineStatusUpdater = ClusterNodeStatusUpdater( machineName, self, gridEngine ) + self.m_energyConsumption = 0.0 # estimate of the energy consumption of this machine since the start of cluster controller (in joules) + self.m_energySavings = 0.0 # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules) + + def getName( self ): + return self.m_name + + def isReady( self ): + if self.m_powerState == PowerState.UNKNOWN: + #logInfo( self.m_name + ' is not ready (waiting for power state)' ) + return False + if self.m_powerState == PowerState.ON: + return True + #log( self.m_name + ' is ready' ) + return True + + def getPowerState( self ): + return self.m_powerState + + def setShouldAlwaysBeOn( self ): + self.m_machineStatusUpdater.setShouldAlwaysBeOn( ) + self.setPowerState( PowerState.ON ) + + def setPowerState( self, powerState ): + bUpdateRequiredChecks = False + if self.m_powerState == PowerState.UNKNOWN: + logInfo('ClusterNode::setPowerState : '+self.m_name+'\'s power state has been initialized to '+PowerStateToStr( powerState )) + self.m_powerState = powerState + self.m_lastPowerStateTime = datetime.now() + bUpdateRequiredChecks = True + else: + # update the estimation of energy consumption + self.updateEnergyMeasurements() + # then change the power state + if self.m_powerState != powerState: + logInfo('ClusterNode::setPowerState : '+self.m_name+'\'s power state has been changed to '+PowerStateToStr( powerState )) + self.m_powerState = powerState + self.m_lastPowerStateTime = datetime.now() + bUpdateRequiredChecks = True + if bUpdateRequiredChecks: + if self.m_powerState == PowerState.ON: + self.m_machineStatusUpdater.m_bCheckPowerState = True + self.m_machineStatusUpdater.m_bCheckSensors = True + elif self.m_powerState == PowerState.OFF: + self.m_machineStatusUpdater.m_bCheckPowerState = True + self.m_machineStatusUpdater.m_bCheckSensors = False + elif self.m_powerState == PowerState.SLEEP: + self.m_machineStatusUpdater.m_bCheckPowerState = True + self.m_machineStatusUpdater.m_bCheckSensors = False + elif self.m_powerState == PowerState.UNPLUGGED: + self.m_machineStatusUpdater.m_bCheckPowerState = True + self.m_machineStatusUpdater.m_bCheckSensors = False + else: + assert( False ) + + def onNewPowerStateReading( self, powerState ): + """ + called when a new powerstate reading arrives + """ + if powerState != self.getPowerState(): + if self.getPowerState() != PowerState.UNKNOWN: + logWarning('ClusterNode::onNewPowerStateReading : '+self.m_name+'\'s power state has been (manually it seems) changed to '+PowerStateToStr( powerState )) + self.setPowerState( powerState ) + + def getPowerConsumptionForPowerState( self, ePowerState ): + """ + returns the power consumption estimation (in watts) of this machine for the given power state + """ + fCurrentIntensity = 0.0 + fCurrentVoltage = 220.0 + # noticed on 26.08.2009 that putting 22 machines from sleep to on eats 17 A, resulting in difference of 0.77 A per machine + if ePowerState == PowerState.ON: + fCurrentIntensity = 0.9 # value when the machine is doing nothing + elif ePowerState == PowerState.OFF: + fCurrentIntensity = 0.1 + elif ePowerState == PowerState.SLEEP: + fCurrentIntensity = 0.1 + elif ePowerState == PowerState.UNPLUGGED: + fCurrentIntensity = 0.0 + else: + assert(False) + return fCurrentIntensity * fCurrentVoltage + + def updateEnergyMeasurements( self ): + timeInterval = datetime.now() - self.m_lastPowerStateTime + self.m_energyConsumption += self.getPowerConsumptionForPowerState( self.m_powerState ) * timeInterval.seconds + self.m_energySavings += ( self.getPowerConsumptionForPowerState( PowerState.ON ) - self.getPowerConsumptionForPowerState( self.m_powerState ) ) * timeInterval.seconds + self.m_lastPowerStateTime = datetime.now() + #logDebug('energy savings on %s : %f J' %(self.getName(), self.m_energySavings)) + + def getEnergyConsumption( self ): + """ + in joules + """ + self.updateEnergyMeasurements() + return self.m_energyConsumption + + def getPowerConsumption( self ): + fCurrentPowerConsumption = self.getPowerConsumptionForPowerState( self.m_powerState ) + #logDebug('getPowerConsumption of %s : %f (powerstate = %d)' % (self.getName(), fCurrentPowerConsumption, self.m_powerState)) + return fCurrentPowerConsumption + + def getEnergySavings( self ): + self.updateEnergyMeasurements() + return self.m_energySavings + + def onSleepFailedBecauseAJobJustArrived( self ): + logInfo('%s was scheduled to sleep but the sleep is canceled because it\'s currently executing a new job' % self.m_name) + + def requestSleep( self, sleepCompleteNotifier = None ): + self.m_machineStatusUpdater.requestSleep( sleepCompleteNotifier ) + + def requestWakeUp( self, wakeUpCompleteNotifier = None ): + self.m_machineStatusUpdater.requestWakeUp( wakeUpCompleteNotifier ) + + def getQueueMachineName( self ): + return self.getCluster().getJobsState().getQueueMachine( self.m_name ).getName() + assert( self.m_queueName != None ) + return self.m_queueName + + def getCluster( self ): + return self.m_cluster diff --git a/ClusterController/ClusterNodeStatusUpdater.py b/ClusterController/ClusterNodeStatusUpdater.py new file mode 100644 index 0000000..33b6946 --- /dev/null +++ b/ClusterController/ClusterNodeStatusUpdater.py @@ -0,0 +1,188 @@ +import threading +import time +import Lib.Util +import Lib.SimpaDbUtil +import os +import traceback +import sys +from PowerState import * +from QstatParser import * +import Util + +class IWakeUpCompleteNotifier: + """ + interface for wakeup notifiers + """ + def onWakeUpComplete( self ): + assert( False ) + +class ISleepCompleteNotifier: + """ + interface for sleep notifiers + """ + def onSleepComplete( self, bSleepSucceeded ): + assert( False ) + +class IRequest: + GO_TO_SLEEP = 1 + WAKE_UP = 2 + CHECK_POWER_STATE = 3 + + def __init__( self, requestType ): + self.m_type = requestType + + def getType( self ): + return self.m_type + + def process( self, clusterNodeStatusUpdater ): + """ + processes this request + """ + assert( False ) # this method is abstract + +class WakeUpRequest( IRequest ): + + def __init__( self, wakeUpNotifier ): + IRequest.__init__( self, IRequest.WAKE_UP ) + self.m_wakeUpNotifier = wakeUpNotifier + + def process( self, clusterNodeStatusUpdater ): + assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to wake up a machine that should always be on ? + logInfo('Handling wakeup request for %s' % clusterNodeStatusUpdater.getName() ) + bSuccess = blockingWakeUpMachine( clusterNodeStatusUpdater.getName() ) + assert( bSuccess ) + # activate the associated machine queue + if clusterNodeStatusUpdater.setQueueActivation( True ): + None # all is ok + else: + assert( False ) + clusterNodeStatusUpdater.m_stateLock.acquire() + clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.ON ) + clusterNodeStatusUpdater.m_stateLock.release() + if self.m_wakeUpNotifier: + logDebug('ClusterNodeStatusUpdater::run : Sending wakeup notification') + self.m_wakeUpNotifier.onWakeUpComplete() + +class SleepRequest( IRequest ): + + def __init__( self, sleepCompleteNotifier ): + IRequest.__init__( self, IRequest.GO_TO_SLEEP ) + self.m_sleepCompleteNotifier = sleepCompleteNotifier + + def process( self, clusterNodeStatusUpdater ): + assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to put a machine the should stay on to sleep ? + logInfo('Handling sleep request for %s' % clusterNodeStatusUpdater.getName() ) + if clusterNodeStatusUpdater.setQueueActivation( False ): + if clusterNodeStatusUpdater.queueIsEmpty(): + if blockingPutMachineToSleep( clusterNodeStatusUpdater.m_clusterNodeName ): + # now we know that the machine is asleep + clusterNodeStatusUpdater.m_stateLock.acquire() + clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.SLEEP ) + clusterNodeStatusUpdater.m_stateLock.release() + if self.m_sleepCompleteNotifier: + self.m_sleepCompleteNotifier.onSleepComplete( True ) + else: + assert( False ) + else: + # reactivate the queue + if not clusterNodeStatusUpdater.setQueueActivation( True ): + assert( False ) + clusterNodeStatusUpdater.m_stateLock.acquire() + clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.ON ) # this is necessary to reenable the various cyclic checks that were disabled on sleep request + clusterNodeStatusUpdater.m_stateLock.release() + clusterNodeStatusUpdater.m_clusterNode.onSleepFailedBecauseAJobJustArrived() + if self.m_sleepCompleteNotifier: + self.m_sleepCompleteNotifier.onSleepComplete( False ) + else: + assert( False ) + +class CheckPowerStateRequest( IRequest ): + + def __init__( self ): + IRequest.__init__( self, IRequest.CHECK_POWER_STATE ) + + def process( self, clusterNodeStatusUpdater ): + powerState = Util.getPowerState( clusterNodeStatusUpdater.m_clusterNodeName ) + clusterNodeStatusUpdater.m_stateLock.acquire() + clusterNodeStatusUpdater.m_clusterNode.onNewPowerStateReading( powerState ) + clusterNodeStatusUpdater.m_lastPowerStateCheckTime = time.time() + clusterNodeStatusUpdater.m_stateLock.release() + +class ClusterNodeStatusUpdater( threading.Thread ): + DELAY_BETWEEN_POWERSTATE_CHECKS=5*60 # in seconds + + def __init__( self, machineName, clusterNode, gridEngine ): + threading.Thread.__init__(self) + self.m_clusterNodeName = machineName + self.m_clusterNode = clusterNode + self.m_gridEngine = gridEngine + self.m_bStop = False + self.m_lastPowerStateCheckTime = None #time.time() + self.m_bCheckPowerState = True + self.m_stateLock = threading.Lock() # lock that prevents concurrent access to the state of this instance + self.m_bShouldAlwaysBeOn = False # indicates that the machine should never go to sleep or off for whatever reason (eg simpatix10) + self.m_pendingRequestsQueue = [] + + def getGridEngine( self ): + return self.m_gridEngine + + def getName( self ): + return self.m_clusterNodeName + + def setShouldAlwaysBeOn( self ): + print('%s should always be on' % (self.getName()) ) + self.m_bShouldAlwaysBeOn = True + + def pushRequest( self, request ): + self.m_stateLock.acquire() + self.m_pendingRequestsQueue.append(request) + self.m_stateLock.release() + + def popRequest( self ): + oldestRequest = None + self.m_stateLock.acquire() + if len(self.m_pendingRequestsQueue) != 0: + oldestRequest = self.m_pendingRequestsQueue.pop(0) + self.m_stateLock.release() + return oldestRequest + + def run( self ): + try: + + while not self.m_bStop : + # handle the oldest request + request = self.popRequest() + if request != None : + request.process( self ) + + # schedule a power state check if required + currentTime = time.time() + if self.m_bCheckPowerState: + if not self.m_bShouldAlwaysBeOn: # don't do power checks on such machines because some current implementations of + # operations involved might cause the machine to go to sleep + if (not self.m_lastPowerStateCheckTime) or (currentTime > (self.m_lastPowerStateCheckTime + ClusterNodeStatusUpdater.DELAY_BETWEEN_POWERSTATE_CHECKS)): + self.pushRequest( CheckPowerStateRequest() ) + + time.sleep(1) + except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) + Util.onException(exception) + + def requestSleep( self, sleepCompleteNotifier = None ): + assert( self.m_bShouldAlwaysBeOn == False ) + self.pushRequest( SleepRequest( sleepCompleteNotifier ) ) + + def requestWakeUp( self, wakeUpNotifier = None ): + assert( self.m_bShouldAlwaysBeOn == False ) + self.pushRequest( WakeUpRequest( wakeUpNotifier ) ) + + def getQueueMachineName( self ): + return self.m_clusterNode.getQueueMachineName() + + def setQueueActivation( self, bEnable ): + """ + @return true on success, false otherwise + """ + return self.getGridEngine().setQueueInstanceActivation( self.getQueueMachineName(), bEnable ) + + def queueIsEmpty( self ): + return self.getGridEngine().queueIsEmpty( self.getName() ) diff --git a/ClusterController/ClusterStatus.py b/ClusterController/ClusterStatus.py new file mode 100644 index 0000000..7e9d877 --- /dev/null +++ b/ClusterController/ClusterStatus.py @@ -0,0 +1,185 @@ +import threading +from JobsStateUpdater import * +import Lib.Util +import Lib.SimpaDbUtil +from ClusterNode import * +import time + +class ClusterStatus: + """ + The current state (jobs, sensors) of the cluster + + @param gridEngine the interface to the batch job tool (in our case it's sun grid engine) + """ + def __init__(self, gridEngine): + self.m_gridEngine = gridEngine + self.m_clusterNodes = {} + self.m_lock = threading.Lock() # to prevent concurrent access to this instance + self.m_jobsStateUpdater = JobsStateUpdater( self ) + self.m_jobsState = None + #self.m_controlledMachineNames = [ 'simpatix26', 'simpatix27', 'simpatix38', 'simpatix10' ] + #self.m_controlledMachineNames = [ 'simpatix15' ] + self.m_controlledMachineNames = [] # [ 'simpatix10' ] + if True: + for iMachine in range(11, 40): + if (iMachine == 31) or (iMachine == 32): + continue # these machines don't seem to be able to go to sleep properly (bug 00000010) + if (iMachine == 18): + continue # this machine needs maintenance (restarting because it's very slow for an unknown reason) + self.m_controlledMachineNames.append( 'simpatix%d' % iMachine ) + nodeNames = Lib.SimpaDbUtil.getClusterMachinesNames() + for nodeName in nodeNames: + if nodeName in self.m_controlledMachineNames: + logInfo( 'machine %s is under the cluster controller\'s control' % nodeName ) + clusterNode = ClusterNode( nodeName, self, gridEngine ) + if nodeName == 'simpatix10': + clusterNode.setShouldAlwaysBeOn() + self.m_clusterNodes[ nodeName ] = clusterNode + return + + def getGridEngine( self ): + return self.m_gridEngine + + def getMachines( self ): + return self.m_clusterNodes + + def startReadingThreads( self ): + for k, v in self.m_clusterNodes.iteritems(): + v.m_machineStatusUpdater.start() + self.m_jobsStateUpdater.start() + + def stopReadingThreads( self ): + for k, v in self.m_clusterNodes.iteritems(): + v.m_machineStatusUpdater.m_bStop = True + v.m_machineStatusUpdater.join() + self.m_jobsStateUpdater.m_bStop = True + self.m_jobsStateUpdater.join() + + def onNewJobsState( self, newJobsState ): + #logDebug( 'ClusterStatus::onNewJobsState : attempting to acquire lock to access m_jobsState' ) + self.m_lock.acquire() + #logDebug( 'ClusterStatus::onNewJobsState : got lock to access m_jobsState' ) + self.m_jobsState = newJobsState + self.m_lock.release() + + def getJobsOnMachine( self, machineName ): + return self.m_jobsState.getJobsOnMachine( machineName ) + + def isReady( self ): + for k, v in self.m_clusterNodes.iteritems(): + if not v.isReady(): + logInfo( 'ClusterStatus::isReady : not ready because of ' + v.getName() ) + return False + #log('ClusterStatus::isReady() : '+k+' is ready') + #assert( False ) + if self.m_jobsState == None: + logInfo( 'ClusterStatus::isReady : not ready because waiting for jobs state' ) + return False + return True + + def getIdleMachines( self ): + assert( self.isReady ) + bBUG_00000009_IS_STILL_ALIVE = True + if bBUG_00000009_IS_STILL_ALIVE: + currentTime = time.time() + fJOBS_STATE_MAX_ALLOWED_AGE = 3600 + fJobsStateAge = currentTime - self.m_jobsState.getTime() + if fJobsStateAge > fJOBS_STATE_MAX_ALLOWED_AGE: + logError('ClusterStatus::getIdleMachines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge)) + assert( False ) + idleMachines = {} + for machineName, machine in self.m_clusterNodes.iteritems(): + if machine.getPowerState() == PowerState.ON: + jobsOnThisMachine = self.getJobsOnMachine( machineName ) + if len(jobsOnThisMachine) == 0: + idleMachines[ machineName ] = machine + return idleMachines + + def getPendingJobs( self ): + return self.m_jobsState.getPendingJobs() + + def getJobsState( self ): + return self.m_jobsState + + def queueMachineFitsJobRequirements( self, queueMachine, jobRequirements ): + if jobRequirements.m_queues: + bQueueIsInAllowedQueues = False + for queueName in jobRequirements.m_queues: + if queueName == queueMachine.getQueueName(): + bQueueIsInAllowedQueues = True + if not bQueueIsInAllowedQueues: + logInfo('queueMachineFitsJobRequirements : queueMachine '+queueMachine.getName()+' rejected because it\'s not in the allowed queues') + return False + return True + + def getEnergyConsumption( self ): + """ + returns an estimate of the energy consumption since the start of the cluster controller (in joules) + """ + fEnergyConsumption = 0.0 + for machine in self.m_clusterNodes.itervalues(): + fEnergyConsumption += machine.getEnergyConsumption() + return fEnergyConsumption + + def getEnergySavings( self ): + """ + returns an estimate of the energy saving since the start of the cluster controller (in joules) + """ + fEnergySavings = 0.0 + for machine in self.m_clusterNodes.itervalues(): + fEnergySavings += machine.getEnergySavings() + return fEnergySavings + + def getCurrentPowerConsumption( self ): + fPowerConsumption = 0.0 + for machine in self.m_clusterNodes.itervalues(): + fPowerConsumption += machine.getPowerConsumption() + return fPowerConsumption + + def getCurrentPowerSavings( self ): + fPowerSavings = 0.0 + for machine in self.m_clusterNodes.itervalues(): + fPowerSavings += machine.getPowerConsumptionForPowerState( PowerState.ON ) - machine.getPowerConsumption() + return fPowerSavings + + def getNumControlledSlots( self ): + self.m_lock.acquire() + iNumControlledSlots = 0 + for machine in self.m_clusterNodes.itervalues(): + queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) + iNumControlledSlots += queueMachine.getNumSlots() + self.m_lock.release() + return iNumControlledSlots + + def getNumUsedSlots( self ): + self.m_lock.acquire() + iNumUsedSlots = 0 + for machine in self.m_clusterNodes.itervalues(): + queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) + iNumUsedSlotsOnThisMachine = queueMachine.getNumSlots() - self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) + assert(iNumUsedSlotsOnThisMachine >= 0) + iNumUsedSlots += iNumUsedSlotsOnThisMachine + self.m_lock.release() + return iNumUsedSlots + + def getNumWastedSlots( self ): + self.m_lock.acquire() + iNumWastedSlots = 0 + for machine in self.m_clusterNodes.itervalues(): + if machine.getPowerState() == PowerState.ON: + queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) + iNumWastedSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) + self.m_lock.release() + return iNumWastedSlots + + def getNumSleepingSlots( self ): + self.m_lock.acquire() + iNumSleepingSlots = 0 + for machine in self.m_clusterNodes.itervalues(): + if machine.getPowerState() == PowerState.SLEEP: + queueMachine = self.m_jobsState.getQueueMachine( machine.getName() ) + iNumSleepingSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) + self.m_lock.release() + return iNumSleepingSlots + + diff --git a/ClusterController/Install.py b/ClusterController/Install.py new file mode 100755 index 0000000..90a88e7 --- /dev/null +++ b/ClusterController/Install.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +""" + script that installs ClusterController on simpatix10 + to start ClusterController : + launchctl start fr.univ-rennes1.ipr.ClusterController +""" +import sys +sys.path.insert(0, '..') +from Lib.Util import * +import os + +if __name__ == '__main__': + machineName = 'simpatix10' + strThisDir = os.getcwd() + strPythonDevDir = strThisDir + '/..' + print( 'installing ClusterController on '+machineName ) + remoteCommand = '' + remoteCommand += 'mkdir -p /usr/local/bin/ipr/Python;' + remoteCommand += 'rm -r /usr/local/bin/ipr/Python/Lib;' + remoteCommand += 'rm -r /usr/local/bin/ipr/Python/ClusterController;' + remoteCommand += 'cp -r %s/Lib /usr/local/bin/ipr/Python/;' % strPythonDevDir + remoteCommand += 'cp -r %s/ClusterController /usr/local/bin/ipr/Python/;' % strPythonDevDir + remoteCommand += 'cp %s/ClusterController/ClusterController.plist /Library/LaunchDaemons/fr.univ-rennes1.ipr.ClusterController.plist;' % strPythonDevDir + remoteCommand += 'cp -r %s/ClusterController/ClusterControllerLauncher.sh /usr/local/bin/ipr/Python/ClusterController/;' % strPythonDevDir + remoteCommand += 'launchctl unload /Library/LaunchDaemons/fr.univ-rennes1.ipr.ClusterController.plist;' + remoteCommand += 'launchctl load /Library/LaunchDaemons/fr.univ-rennes1.ipr.ClusterController.plist;' + command = 'ssh root@'+ machineName +' "'+remoteCommand+'"' + ( returnCode, stdout, stderr ) = executeCommand( command ) + for strSingleCommand in remoteCommand.split(';'): + print(strSingleCommand) + print(stdout) + print(stderr) + if returnCode == 0: + print('install succeeded on '+machineName) + else: + print('install failed on '+machineName+' (see below for detail)') + print stderr + #assert( False ) + + + diff --git a/ClusterController/Job.py b/ClusterController/Job.py new file mode 100644 index 0000000..5efa861 --- /dev/null +++ b/ClusterController/Job.py @@ -0,0 +1,102 @@ + +class JobStateFlags: + RUNNING=1 # the job is running + WAITING=2 # the job is waiting + QUEUED=4 # not sure what that exactly means but it reflects the q state of jobs as seen in the pending jobs list from qstat -f -u \* + TRANSFERING=8 + +class ParallelEnvironment: + MPI=1 + +class JobRequirements: + def __init__( self ): + self.m_numSlots = None + self.m_strArchitecture = None # machine architecture + self.m_parallelEnvironment = None + self.m_queues = None # the list of queues this job is allowed to run on + + +class JobId: + """ + the identifier of a job. + We treat each element of a job array as a separate job + A single integer is no longer enough to identify a job because all elements in a job array + share the same sge job identifier. To uniquely define a job array element, we also use the task id. + """ + def __init__( self, iJobId, iJobArrayElementId = None): + self.m_iJobId = iJobId + self.m_iJobArrayElementId = iJobArrayElementId # None if this identifier does not refer to a job array element + + def __hash__( self ): + """ + required to use a JobId as a dict hash key + """ + return self.m_iJobId # very simple hashing that conflicts only for job array elements + + def __eq__( self, other ): + """ + required to use a JobId as a dict hash key + """ + if self.m_iJobId != other.m_iJobId: + return False + if self.m_iJobArrayElementId != other.m_iJobArrayElementId: + return False + return True + + def isJobArrayElement( self ): + return (self.m_iJobArrayElementId != None) + + def asStr( self ): + strResult = '%s' % self.m_iJobId + if self.isJobArrayElement(): + strResult += '.%d' % self.m_iJobArrayElementId + return strResult + + + +class Job: + def __init__( self, jobId ): + self.m_jobId = jobId + self.m_startTime = None + self.m_submitTime = None + self.m_owner = None + self.m_scriptName = None + self.m_slots = {} + self.m_stateFlags = 0 + self.m_jobRequirements = JobRequirements() + def getId( self ): + return self.m_jobId + def setState( self, state ): + self.m_stateFlags = state + def setOwner( self, jobOwner ): + if self.m_owner: + assert( self.m_owner == jobOwner ) + self.m_owner = jobOwner + def getOwner( self ): + return self.m_owner + def setStartTime( self, jobStartTime ): + if self.m_startTime: + assert( self.m_startTime == jobStartTime ) + self.m_startTime = jobStartTime + def setSubmitTime( self, jobSubmitTime ): + if self.m_submitTime: + assert( self.m_submitTime == jobSubmitTime ) + self.m_submitTime = jobSubmitTime + def getStartTime( self ): + return self.m_startTime + def setScriptName( self, jobScriptName ): + if self.m_scriptName: + assert( self.m_scriptName == jobScriptName ) + self.m_scriptName = jobScriptName + def addSlots( self, machineName, numSlots ): + assert( self.m_slots.get( machineName ) == None ) + self.m_slots[ machineName ] = numSlots + def getSlots( self ): + return self.m_slots + def setNumRequiredSlots( self, numSlots ): + self.m_jobRequirements.m_numSlots = numSlots + def isPending( self ): + """ + returns true if this job is waiting in the queue for whatever reason + """ + return self.m_stateFlags & JobStateFlags.QUEUED \ No newline at end of file diff --git a/ClusterController/JobsState.py b/ClusterController/JobsState.py new file mode 100644 index 0000000..141586d --- /dev/null +++ b/ClusterController/JobsState.py @@ -0,0 +1,71 @@ +from Log import * + +class JobsState: + """ + represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \*" + """ + def __init__( self ): + self.m_jobs = {} # list of jobs + self.m_queueMachines = {} # list of queue machines such as allintel.q@simpatix10 + self.m_stateTime = None # the time at which the state was snapshot + + def deleteAllJobs( self ): + self.m_jobs = {} + + def addJob( self, job ): + self.m_jobs[ job.getId() ] = job + + def getJob( self, jobId ): + return self.m_jobs.get( jobId ) + + def setTime( self, stateTime ): + self.m_stateTime = stateTime + + def getTime( self ): + return self.m_stateTime + + def getJobsOnMachine( self, machineName ): + jobsOnMachine = {} + for jobId, job in self.m_jobs.iteritems(): + if job.getSlots().get(machineName): + jobsOnMachine[ jobId ] = job + return jobsOnMachine + + def getNumFreeSlotsOnQueueMachine( self, queueMachine ): + #logInfo('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.getName() ) + numUsedSlots = 0 + for job in self.m_jobs.itervalues(): + numUsedSlotsByThisJob = job.getSlots().get( queueMachine.getMachineName() ) + if numUsedSlotsByThisJob != None: + #logInfo('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob) ) + numUsedSlots += numUsedSlotsByThisJob + else: + None + #logInfo('getNumFreeSlotsOnQueueMachine : job %d uses no slot' % job.getId().asStr() ) + numFreeSlots = queueMachine.getNumSlots() - numUsedSlots + assert( numFreeSlots >= 0 ) + return numFreeSlots + + def addQueueMachine( self, queueMachine ): + self.m_queueMachines[ queueMachine.getName() ] = queueMachine + + def getQueueMachine( self, machineName ): + """ + finds the queue machine associated with a machine + """ + queueMachine = None + for qmName, qm in self.m_queueMachines.iteritems(): + if qm.m_machineName == machineName: + assert( queueMachine == None ) # to be sure that no more than one queue machine is on a given machine + queueMachine = qm + return queueMachine + + def getQueueMachines( self ): + return self.m_queueMachines + + def getPendingJobs( self ): + pendingJobs = {} + for jobId, job in self.m_jobs.iteritems(): + if job.isPending(): + pendingJobs[ job.getId() ] = job + return pendingJobs diff --git a/ClusterController/JobsStateUpdater.py b/ClusterController/JobsStateUpdater.py new file mode 100644 index 0000000..845c39a --- /dev/null +++ b/ClusterController/JobsStateUpdater.py @@ -0,0 +1,35 @@ +import threading +import Util +import os +import traceback +import sys +import time + +class JobsStateUpdater( threading.Thread ): + DELAY_BETWEEN_STATUS_CHECKS=10 # in seconds + def __init__( self, clusterStatus ): + threading.Thread.__init__(self) + self.m_clusterStatus = clusterStatus + self.m_bStop = False + + def getName( self ): + return 'JobsStateUpdater' + + def getGridEngine( self ): + return self.m_clusterStatus.getGridEngine() + + def updateClusterStatus( self ): + #log('JobsStateUpdater::updateClusterStatus : start') + + jobsState = self.getGridEngine().getCurrentJobsState() + # update the jobs in the cluster status + self.m_clusterStatus.onNewJobsState( jobsState ) + #log('JobsStateUpdater::updateClusterStatus : end') + + def run( self ): + try: + while not self.m_bStop : + self.updateClusterStatus() + time.sleep(JobsStateUpdater.DELAY_BETWEEN_STATUS_CHECKS) + except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) + Util.onException(exception) diff --git a/ClusterController/Log.py b/ClusterController/Log.py new file mode 100644 index 0000000..b0aa283 --- /dev/null +++ b/ClusterController/Log.py @@ -0,0 +1,30 @@ +import time +import threading + +gLogFilePath = '/var/log/ClusterController.log' + +def log( message ): + return + threadName = threading.currentThread().getName() + logMessage = time.asctime(time.localtime())+' : '+ threadName + ' : ' + message + print logMessage + f = open(gLogFilePath, 'a+') + assert( f ) + try: + f.write( logMessage + '\n' ) + finally: + f.close() + +def logDebug( message ): + log('[D]'+message) + return + +def logInfo( message ): + log('[I]'+message) + +def logWarning( message ): + log('[W]'+message) + +def logError( message ): + log('[E]'+message) + diff --git a/ClusterController/Main.py b/ClusterController/Main.py new file mode 100644 index 0000000..c743de5 --- /dev/null +++ b/ClusterController/Main.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python +import sys +sys.path.insert(0, '..') +import os +from Lib.Util import * +from Lib.SimpaDbUtil import * +import time +from ClusterStatus import ClusterStatus +from SlotAllocator import * +from Log import * +from ClusterNodeStatusUpdater import * +from SunGridEngine import SunGridEngine +import Util + +from HTMLParser import HTMLParser + +VERSION='1.18' + +class MyHTMLParser(HTMLParser): + def __init__(self): + HTMLParser.__init__(self) + self.TokenList = [] + def handle_data( self,data): + data = data.strip() + if data and len(data) > 0: + self.TokenList.append(data) + #print data + def GetTokenList(self): + return self.TokenList + + +class WakeUpCompleteNotifier( IWakeUpCompleteNotifier ): + def __init__(self, machineName, clusterController): + self.m_machineName = machineName + self.m_clusterController = clusterController + def onWakeUpComplete( self ): + logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start') + self.m_clusterController.onMachineWakeUpComplete( self.m_machineName ) + +class SleepCompleteNotifier( ISleepCompleteNotifier ): + def __init__(self, machineName, clusterController): + self.m_machineName = machineName + self.m_clusterController = clusterController + def onSleepComplete( self, bSleepSucceeded ): + logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start') + self.m_clusterController.onMachineSleepComplete( self.m_machineName, bSleepSucceeded ) + +def jouleToKwh( fEnergyInJoules ): + """ + converts joules to kWH + """ + # 1 kWh = 1000 * 3600 J + return fEnergyInJoules / (1000.0 * 3600.0) + +class ClusterController: + """ + The cluster controller monitors the cluster's activity and has multiple purposes : + - energy saving : it can put some machines to sleep if they have nothing to do, or it + can wake them up when needed (eg when a new job has arrived) + - auto-repair : for examples + - it happened sometimes that sge_execd process disappeared for some unknown reason + in that case, the cluster controller can detect it and restart the daemon + automatically, without administrator's intervention + - clear the Error state of queues + - it could also be used to dynamically adapt sge's settings to the requirements of + jobs (eg add some machines to a queue). + Mechanism to let user get priority + """ + def __init__( self ): + gridEngine = SunGridEngine() + self.m_clusterStatus = ClusterStatus( gridEngine ) + self.m_slotAllocator = DecoupledSlotAllocator() #SimpleSlotAllocator() + self.m_machinesThatNeedWakeUp = {} + self.m_machinesThatNeedWakeupLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedWakeUp + self.m_machinesThatNeedSleeping = {} + self.m_machinesThatNeedSleepingLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedSleeping + self.m_lastEnergyStatusLogTime = None + self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds + self.m_iSessionId = None # session (run) identifier in database + + def getClusterStatus( self ): + return m_clusterStatus + + def log( self, message ): + print message + + def shutdownLeastImportantNode( self ): + self.log("ClusterController::shutdownLeastImportantNode : start") + + def onMachineWakeUpComplete( self, machineName ): + self.m_machinesThatNeedWakeupLock.acquire() + #logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) + del self.m_machinesThatNeedWakeUp[ machineName ] + #logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) + self.m_machinesThatNeedWakeupLock.release() + logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName) + + def onMachineSleepComplete( self, machineName, bSleepSucceeded ): + self.m_machinesThatNeedSleepingLock.acquire() + #logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) + del self.m_machinesThatNeedSleeping[ machineName ] + #logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)) ) + self.m_machinesThatNeedSleepingLock.release() + if bSleepSucceeded: + logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName) + else: + logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it can\'t be put to sleep at the moment (eg a job just arrived)' % machineName) + + def getNumPendingWakeUps( self ): + self.m_machinesThatNeedWakeupLock.acquire() + numPendingWakeUps = len(self.m_machinesThatNeedWakeUp) + self.m_machinesThatNeedWakeupLock.release() + return numPendingWakeUps + + def getNumPendingSleeps( self ): + self.m_machinesThatNeedSleepingLock.acquire() + numPendingSleeps = len(self.m_machinesThatNeedSleeping) + self.m_machinesThatNeedSleepingLock.release() + return numPendingSleeps + + def putIdleMachinesToSleep( self ): + self.m_clusterStatus.m_lock.acquire() + idleMachines = self.m_clusterStatus.getIdleMachines() + # logInfo('idleMachines :') + self.m_machinesThatNeedToSleep = [] + for machineName, idleMachine in idleMachines.iteritems(): + if idleMachine.getPowerState() == PowerState.ON: + # logInfo('\t%s' % machineName) + if idleMachine.getName() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things + self.m_machinesThatNeedSleeping[idleMachine.getName()]=idleMachine + self.m_clusterStatus.m_lock.release() + + listOfMachinesThatNeedSleeping = self.m_machinesThatNeedSleeping.values() # duplicate the list so that we don't iterate on m_machinesThatNeedSleeping, which could cause a runtime error because callbacks alter m_machinesThatNeedWakeUp + for machine in listOfMachinesThatNeedSleeping: + logInfo('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.getName()) + machine.requestSleep( SleepCompleteNotifier( machine.getName(), self ) ) + + if len(listOfMachinesThatNeedSleeping) != 0: + # hack : wait until the sleep requests are handled so that we don't request the same machine to sleep multiple times + while self.getNumPendingSleeps() > 0: + time.sleep(1) + + + def wakeUpMachinesForPendingJobs(self): + listOfMachinesThatNeedWakeUp = [] + + self.m_clusterStatus.m_lock.acquire() + pendingJobs = self.m_clusterStatus.getPendingJobs() + """ + logInfo('pending jobs :') + for job in pendingJobs.itervalues(): + logInfo('\t%d' % job.getId().asStr()) + """ + if len(pendingJobs) != 0: + self.m_machinesThatNeedWakeUp = self.m_slotAllocator.getMachinesThatNeedWakeUp( pendingJobs, self.m_clusterStatus ) + if len(self.m_machinesThatNeedWakeUp) == 0: + None + #logInfo('ClusterController::updateNormalState : no machine needs waking up' ) + else: + listOfMachinesThatNeedWakeUp = self.m_machinesThatNeedWakeUp.values() # duplicate the list so that we don't iterate on m_machinesThatNeedWakeUp, which would cause a runtime error because callbacks alter m_machinesThatNeedWakeUp + for machine in listOfMachinesThatNeedWakeUp: + logInfo('ClusterController::wakeUpMachinesForPendingJobs : requesting wake up for '+machine.getName() ) + machine.requestWakeUp( WakeUpCompleteNotifier( machine.getName(), self ) ) + self.m_clusterStatus.m_lock.release() + + if len(listOfMachinesThatNeedWakeUp) != 0: + # hack : wait until the wakeup requests are handled so that a later sleep request doesn't cancel it + # and also wait for the jobs to come in + while self.getNumPendingWakeUps() > 0: + time.sleep(1) + iSGE_CHEK_RUNNABLE_JOBS_DELAY = 60 * 5 # max time it takes for sge between the fact that a queued job is runnable and SGE actually starting it (I've put a long time here because sometimes, qstat takes a long time to ralise that the machine is available after I wake it up) + logInfo('ClusterController::wakeUpMachinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY) + # wait until SGE has a chance to allocate slots + time.sleep(iSGE_CHEK_RUNNABLE_JOBS_DELAY) # note : this is annoying because it blocks the main thread. This could be improved if we forbid the machines to go to sleep for that much time.... + logInfo('ClusterController::wakeUpMachinesForPendingJobs : end of the delay given to SGE to allocate slots') + + def updateNormalState( self ): + # attempt to shut down machines that are idle + self.putIdleMachinesToSleep() + # wake up necessary machines if there are pending jobs + self.wakeUpMachinesForPendingJobs() + + def storeSessionInDatabase( self ): + conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller') + assert(conn) + + # retrieve the session id, as it's an auto_increment field + sqlCommand = "SELECT AUTO_INCREMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'clustercontroller' AND TABLE_NAME = 'sessions_desc'" + print sqlCommand + conn.query(sqlCommand) + r=conn.store_result() + iSessionId = r.fetch_row()[0][0] + + # stores information about the session + sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.m_clusterStatus.m_clusterNodes)) + print sqlCommand + conn.query(sqlCommand) + + # initialize the energy savings table + sqlCommand = "INSERT INTO session_to_energy_savings (session_id, energy_savings_kwh) VALUES (%d,0.0);" % (iSessionId) + print sqlCommand + conn.query(sqlCommand) + + conn.close() + print( 'Session Iid = %d' % iSessionId ) + return iSessionId + + def updateSessionEnergyConsumptionInDatabase( self ): + conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller') + assert(conn) + + # update energy savings for the current session + sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % ( jouleToKwh(self.m_clusterStatus.getEnergySavings()) ,self.m_iSessionId) + print sqlCommand + conn.query(sqlCommand) + + # update the end time of the current session + sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.m_iSessionId) + print sqlCommand + conn.query(sqlCommand) + + conn.close() + + + def run( self ): + """ + """ + self.m_iSessionId = self.storeSessionInDatabase() + DELAY_BETWEEN_MEASURES = 10 # in seconds + self.m_clusterStatus.startReadingThreads() + while not self.m_clusterStatus.isReady(): + #log('waiting for system to be ready') + time.sleep(1) + None + logInfo('ClusterController::run : cluster initial readings have completed') + startTime = time.localtime() + while True: + currentTime = time.time() + #clusterStatus.m_nodesStatus['simpatix10'].dump() + if (not self.m_lastEnergyStatusLogTime) or (currentTime > (self.m_lastEnergyStatusLogTime +self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)): + iNumMachines = len(self.m_clusterStatus.m_clusterNodes) + iNumMachinesOn = 0 + iNumSleepingMachines = 0 + for machine in self.m_clusterStatus.m_clusterNodes.itervalues(): + ePowerState = machine.getPowerState() + if ePowerState == PowerState.ON: + iNumMachinesOn+=1 + elif ePowerState == PowerState.SLEEP: + iNumSleepingMachines+=1 + logInfo('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines)) + iNumSlots = self.m_clusterStatus.getNumControlledSlots() + iNumUsedSlots = self.m_clusterStatus.getNumUsedSlots() + iNumWastedSlots = self.m_clusterStatus.getNumWastedSlots() + iNumSleepingSlots = self.m_clusterStatus.getNumSleepingSlots() + logInfo('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots )) + logInfo('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.m_clusterStatus.getCurrentPowerConsumption(), self.m_clusterStatus.getCurrentPowerSavings()) ) + logInfo('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.m_clusterStatus.getEnergyConsumption()), jouleToKwh(self.m_clusterStatus.getEnergySavings()))) + self.updateSessionEnergyConsumptionInDatabase() + self.m_lastEnergyStatusLogTime = currentTime + + self.updateNormalState() + time.sleep(DELAY_BETWEEN_MEASURES) + self.m_clusterStatus.stopReadingThreads() + + +def storeClusterNodeStatus( clusterNodeStatus ): + #conn = MySQLdb.connect('simpatix10', 'measures_writer', '', 'simpa_measurements') + conn = MySQLdb.connect('simpatix10', 'root', '', 'simpa_measurements') + assert(conn) + #conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""") + ''' + conn.query("""SELECT * FROM fan_rpm_logs""") + r=conn.store_result() + print r.fetch_row()[0] + ''' + for key, sensor in clusterNodeStatus.m_sensors.iteritems(): + sensorId = clusterNodeStatus.m_clusterNodeName + '_' + sensor.m_name + if sensor.typeName() == 'Fan': + sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_rpms)+""", NOW());""" + print sqlCommand + conn.query(sqlCommand) + elif sensor.typeName() == 'Temperature': + sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('"""+sensorId+"""', """+str(sensor.m_temperature)+""", NOW());""" + print sqlCommand + conn.query(sqlCommand) + else: + assert(False) + conn.close() + +if __name__ == '__main__': + #Lib.Util.sendTextMail( 'SimpaCluster ', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content') + try: + logInfo('ClusterController v. %s starting....' % VERSION) + #executeCommand('ping -o -t 1 simpatix310 > /dev/null') + #print executeCommand('ssh simpatix10 "ipmitool sensor"') + #assert False, 'prout' + controller = ClusterController() + controller.run() + #machineNameToMacAddress( 'simpatix10' ) + #except AssertionError, error: + #except KeyboardInterrupt, error: + except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) + Util.onException(exception) diff --git a/ClusterController/PowerState.py b/ClusterController/PowerState.py new file mode 100644 index 0000000..039aa6a --- /dev/null +++ b/ClusterController/PowerState.py @@ -0,0 +1,21 @@ + +class PowerState: + UNKNOWN=0 + OFF=1 + ON=2 + SLEEP=3 + UNPLUGGED=4 + +def PowerStateToStr( powerState ): + if powerState == PowerState.UNKNOWN: + return 'UNKNOWN' + if powerState == PowerState.OFF: + return 'OFF' + if powerState == PowerState.ON: + return 'ON' + if powerState == PowerState.SLEEP: + return 'SLEEP' + if powerState == PowerState.UNPLUGGED: + return 'UNPLUGGED' + else: + assert( False ) diff --git a/ClusterController/QstatParser.py b/ClusterController/QstatParser.py new file mode 100644 index 0000000..164594c --- /dev/null +++ b/ClusterController/QstatParser.py @@ -0,0 +1,185 @@ +import StringIO +import re +from JobsState import * +from QueueMachine import * +from Util import * +from Log import * +from Job import * + +class QstatParser: + def parseJobState( self, strJobStatus ): + jobState = 0 + for i in range(0, len(strJobStatus) ): + c = strJobStatus[i] + if c == 'r': + jobState += JobStateFlags.RUNNING + elif c == 'w': + jobState += JobStateFlags.WAITING + elif c == 'q': + jobState += JobStateFlags.QUEUED + elif c == 't': + jobState += JobStateFlags.TRANSFERING + else: + assert( False, 'unhandled job state flag :"' + c + '"' ) + return jobState + def parseQstatOutput( self, qstatOutput ): + jobsState = JobsState() + f = StringIO.StringIO(qstatOutput) + line = f.readline() + currentQueueMachine = None + bInPendingJobsSection = False + # examples of job line : + # 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1 + # a typical job line in the pending jobs section looks like this : + # 43645 0.00000 LC_LV_MC aghoufi qw 08/21/2009 08:14:58 1 + # a typical running job array line looks like this + # 43619 0.56000 SimpleJobA raffy r 08/20/2009 18:13:03 1 3 + # a typical job array line in the pending jobs section looks like this + # 43646 0.00000 SimpleJobA raffy qw 08/21/2009 09:56:40 1 1-4:1 + jobRegularExp = re.compile( '^[ ]*(?P[^ ]+)[ ]+[0-9.]+[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P[0-9]+)[ ]+(?P[^\n]*)[\s]*$' ) + # example of machine line : + # allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86 + machineRegularExp = re.compile( '^(?P[^@]+)@(?P[^.]+)[^ ]+[ ]+(?P[^ ]+)[ ]+(?P[^/]+)/(?P[^/]+)/(?P[^ ]+)[?]*' ) + pendingJobsHeaderRegularExp = re.compile( '^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*' ) + while( len(line) > 0 ): + # print line + # check if the current line is a line describing a job running on a machine + matchObj = jobRegularExp.match( line ) + if matchObj: + # we are dealing with a job line + if not bInPendingJobsSection: + assert( currentQueueMachine ) + #log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"') + iJobId = int(matchObj.group('jobId')) + jobState = self.parseJobState( matchObj.group('jobStatus') ) + strJobArrayDetails = matchObj.group('jobArrayDetails') + bIsJobArray = (len(strJobArrayDetails) != 0) + #logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray))) + # each element of a job array is treated as a separate job for the sake of simplicity. + # For these elements, the job id in sge sense is the same, but they are different in this program's sense + jobElementsIndexRange = range(0,1) # just one element, unless it's a job array + if bIsJobArray: + if bInPendingJobsSection: + jobElementsIndexRange = [] + astrRanges = re.split(',', strJobArrayDetails) + for strRange in astrRanges: + singleIndexMatch = re.match('^(?P[0-9]+)$', strRange) + if singleIndexMatch: + iElementIndex = int(singleIndexMatch.group('elementIndex')) + jobElementsIndexRange.extend(range(iElementIndex, iElementIndex+1)) + else: + # we expect strRange to be of the form "1-4:1", where : + # the 1st number is the min element index (sge imposes it to be greater than 0) + # the 2nd number is the max element index + # the 3rd number is the step between consecutive element indices + rangeMatch = re.match( '^(?P[0-9]+)-(?P[0-9]+):(?P[0-9]+)$', strRange) + if rangeMatch == None: + logError('unexpected format for job array details : "%s" (line="%s"' % (strRange, line) ) + assert(False) + iMinElementIndex=int(rangeMatch.group('minElementIndex')) + iMaxElementIndex=int(rangeMatch.group('maxElementIndex')) + iStepBetweenIndices=int(rangeMatch.group('stepBetweenIndices')) + jobElementsIndexRange.extend(range(iMinElementIndex, iMaxElementIndex+1, iStepBetweenIndices)) + else: + # we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element + iJobArrayElementIndex = int(strJobArrayDetails) + assert(iJobArrayElementIndex != 0) # sge does not allow element indices to be 0 + jobElementsIndexRange = range(iJobArrayElementIndex,iJobArrayElementIndex+1) + for iElementIndex in jobElementsIndexRange: + jobId = None + if bIsJobArray: + jobId = JobId(iJobId, iElementIndex) + else: + jobId = JobId(iJobId) + job = jobsState.getJob(jobId) + #logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr())) + if job == None: + # this job hasn't been encountered yet in the output of qstat ... + # we could either be in the pending jobs section or in the running jobs section + job = Job(jobId) + jobsState.addJob( job ) + job.setState( jobState ) + strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime') + jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S') + if bInPendingJobsSection: + job.setSubmitTime( jobStartOrSubmitTime ) + else: + job.setStartTime( jobStartOrSubmitTime ) + job.setOwner( matchObj.group('jobOwner') ) + job.setScriptName( matchObj.group('jobScriptName') ) + if bInPendingJobsSection: + job.setNumRequiredSlots(int(matchObj.group('numSlots'))) + else: + assert( not bInPendingJobsSection ) # if we are in the pending jobs section, the job should be new + if not bInPendingJobsSection: + job.addSlots( currentQueueMachine.getMachineName(), int(matchObj.group('numSlots')) ) + else: + # the current line does not describe a job + if not bInPendingJobsSection: + # check if this line describes the status of a machine + matchObj = machineRegularExp.match( line ) + if matchObj: + queueName = matchObj.group('queueName') + machineName = matchObj.group('machineName') + queueMachine = QueueMachine( queueName, machineName ) + #log(line) + #log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString')) + #log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots')) + queueMachine.setNumSlots( int( matchObj.group('numTotalSlots') ) ) + + #log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"') + #log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"') + currentQueueMachine = queueMachine + jobsState.addQueueMachine( queueMachine ) + else: + matchObj = pendingJobsHeaderRegularExp.match( line ) + if matchObj: + bInPendingJobsSection = True + currentQueueMachine = None + else: + #print line + None + else: + # we are in a pending jobs section + matchObj = re.match('^[#]+$', line) + if not matchObj: + # unexpected line + print 'line = "' + line + '"' + assert( False ) + None + line = f.readline() + f.close() + return jobsState + def parseJobDetails( self, qstatOutput, job ): + """ + adds to job the details parsed from the output of the "qstat -j " command + """ + f = StringIO.StringIO(qstatOutput) + line = f.readline() + fieldRegularExp = re.compile( '^(?P[^:]+):[ ]+(?P[?]*)$' ) + while( len(line) > 0 ): + # print line + # check if the current line is a line describing a job running on a machine + matchObj = fieldRegularExp.match( line ) + if matchObj: + fieldName = matchObj.group('fieldName') + strFieldValue = matchObj.group('fieldValue') + if fieldName == 'job_number': + assert( job.getId().asStr() == strFieldValue ) + elif fieldName == 'hard_queue_list': + allowedQueues = strFieldValue.split(',') + assert(len(allowedQueues) > 0) + job.m_jobRequirements.m_queues = allowedQueues + elif fieldName == 'parallel environment': + # the value could be 'ompi range: 32' + matchObj = re.match('ompi range: (?P[0-9]+)[?]*', strFieldValue) + if matchObj: + job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI + else: + assert( False ) + else: + # ignore he other fields + None + line = f.readline() + f.close() + \ No newline at end of file diff --git a/ClusterController/QueueMachine.py b/ClusterController/QueueMachine.py new file mode 100644 index 0000000..f62c819 --- /dev/null +++ b/ClusterController/QueueMachine.py @@ -0,0 +1,25 @@ + +class QueueMachine: + """ + a QueueMachine instance represents a given SGE queue on a given machine (eg allintel.q@simpatix10) + """ + def __init__( self, queueName, machineName ): + self.m_queueName = queueName + self.m_machineName = machineName + self.m_numSlots = None + def getName( self ): + """ + returns the name of the machine queue (such as allintel.q@simpatix10) + """ + return self.m_queueName + '@' + self.m_machineName + + def getQueueName( self ): + return self.m_queueName + def getMachineName( self ): + return self.m_machineName + def setNumSlots( self, numSlots ): + self.m_numSlots = numSlots + def getNumSlots( self ): + assert( self.m_numSlots != None ) + return self.m_numSlots + diff --git a/ClusterController/SlotAllocator.py b/ClusterController/SlotAllocator.py new file mode 100644 index 0000000..ee3af29 --- /dev/null +++ b/ClusterController/SlotAllocator.py @@ -0,0 +1,141 @@ +from PowerState import * +from Log import * +import time +import copy + +class Slot: + def __init__( self ): + self.m_queueMachine = None + self.m_numSlots = None + self.m_job = None # job for which this slot is allocated + +class SlotAllocator: + """ + a class that defines a strategy for allocating free slots for the given pending jobs + """ + def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ): + """ + returns the list of machines that need to wake up to make pending jobs running + """ + assert( False ) # this method is abstract + +class SimpleSlotAllocator( SlotAllocator ): + def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ): + machinesThatNeedWakeUp = {} + highestPriorityPendingJob = pendingJobs.values()[0] + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr() ) + numFreeSlots = {} # contains the number of free slots for each queueMachine + for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): + numFreeSlots[ queueMachine ] = clusterState.getJobsState().getNumFreeSlotsOnQueueMachine( queueMachine ) + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : init numFreeSlots[ %s ] with %d ' % (queueMachine.getName(), numFreeSlots[ queueMachine ]) ) + remainingNumSlotsToAllocate = highestPriorityPendingJob.m_jobRequirements.m_numSlots + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate ) + # first look in running machines if there are available slots + for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() ) + machine = clusterState.getMachines()[ queueMachine.getMachineName() ] + if machine.getPowerState() == PowerState.ON: + if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ): + numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate ) + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) ) + + remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine + numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate ) + assert( remainingNumSlotsToAllocate >= 0 ) + if remainingNumSlotsToAllocate == 0: + break + if remainingNumSlotsToAllocate > 0: + # now look into machines that are asleep + for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName() ) + machine = clusterState.getMachines()[ queueMachine.getMachineName() ] + if machine.getPowerState() == PowerState.SLEEP: + if clusterState.queueMachineFitsJobRequirements( queueMachine, highestPriorityPendingJob.m_jobRequirements ): + numSlotsAllocatedOnThisMachine = min( numFreeSlots[ queueMachine ], remainingNumSlotsToAllocate ) + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName() ) ) + remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine + numFreeSlots[ queueMachine ] -= numSlotsAllocatedOnThisMachine + machinesThatNeedWakeUp[ machine.getName() ] = machine + logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate ) + assert( remainingNumSlotsToAllocate >= 0 ) + if remainingNumSlotsToAllocate == 0: + break + if remainingNumSlotsToAllocate != 0: + return {} # not enough slots available + return machinesThatNeedWakeUp + +class DecoupledSlotAllocator( SlotAllocator ): + """ + a slot allocator that doesn't know much about sge, and does not attempts to guess what sge'sceduler would do + Instead, it uses a very simple strategy : it wakes up all the machines periodically to allow jobs to get in. + """ + def __init__( self ): + self.m_delayBetweenPeriodicChecks = -1 # in seconds. Disable periodic checks by setting this to -1 + self.m_lastCheckTime = time.time() + self.m_lastClusterState = None + def jobsStateHasChanged( self, newClusterState ): + """ + returns true if there is a change in the cluster state that can cause a pending job + to start (provided all machines are enabled) + """ + oldJobs = {} + if self.m_lastClusterState: + oldJobs = self.m_lastClusterState.m_jobsState.m_jobs + newJobs = newClusterState.m_jobsState.m_jobs + bJobsHaveChanged = False + oldJobsOnly = oldJobs.copy() # shallow copy + #print 'oldJobs : ', oldJobs + #print 'newJobs : ', newJobs + """ + print 'self.m_lastClusterState', self.m_lastClusterState + print 'newClusterState', newClusterState + if self.m_lastClusterState: + print 'self.m_lastClusterState.m_jobsState', self.m_lastClusterState.m_jobsState + print 'newClusterState.m_jobsState', newClusterState.m_jobsState + print 'id(self.m_lastClusterState) : ', id(self.m_lastClusterState) + print 'id(newClusterState) : ', id(newClusterState) + print 'len(oldJobs) : ', len(oldJobs) + print 'len(newJobs) : ', len(newJobs) + print 'id(oldJobs) : ', id(oldJobs) + print 'id(newJobs) : ', id(newJobs) + """ + for newJob in newJobs.itervalues(): + #logDebug('DecoupledSlotAllocator::jobsStateHasChanged newJob id=%s' % newJob.getId().asStr()) + if newJob.getId() in oldJobs: + #logDebug('DecoupledSlotAllocator::jobsStateHasChanged job id=%d is in old jobs' % newJob.getId()) + del oldJobsOnly[newJob.getId()] + else: + # ah ... a new job has arrived + logInfo('A new job (jobId =%s) has been detected ' % newJob.getId().asStr() ) + bJobsHaveChanged = True + if len(oldJobsOnly) != 0: + for oldJob in oldJobsOnly.itervalues(): + logInfo('Job (jobId =%s) has finished' % oldJob.getId().asStr() ) + # at least one old job has finished, freeing some slots + bJobsHaveChanged = True + return bJobsHaveChanged + def getMachinesThatNeedWakeUp( self, pendingJobs, clusterState ): + machinesThatNeedWakeUp = {} + bJobsStateHasChanged = self.jobsStateHasChanged( clusterState ) + currentTime = time.time() + # we do periodic checks to detect changes in cluster state that are not detected by jobsStateHasChanged + # for example changes in the requirements, in the allocation policy, etc... + bItsTimeForPeriodicCheck = False + if self.m_delayBetweenPeriodicChecks > 0: + bItsTimeForPeriodicCheck = (currentTime - self.m_lastCheckTime) > self.m_delayBetweenPeriodicChecks + if bJobsStateHasChanged or bItsTimeForPeriodicCheck: + if bJobsStateHasChanged: + logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed') + else: + logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)') + for queueMachine in clusterState.getJobsState().getQueueMachines().itervalues(): + if queueMachine.getMachineName() in clusterState.getMachines(): + # this means that the machine is under the cluster controller's control + machine = clusterState.getMachines()[ queueMachine.getMachineName() ] + if machine.getPowerState() == PowerState.SLEEP: + machinesThatNeedWakeUp[ machine.getName() ] = machine + self.m_lastCheckTime = currentTime + self.m_lastClusterState = copy.copy(clusterState) + #print 'self.m_lastClusterState', self.m_lastClusterState + return machinesThatNeedWakeUp diff --git a/ClusterController/SunGridEngine.py b/ClusterController/SunGridEngine.py new file mode 100644 index 0000000..9b6465c --- /dev/null +++ b/ClusterController/SunGridEngine.py @@ -0,0 +1,48 @@ +import Util +from QstatParser import * + +class SunGridEngine: + + def getCurrentJobsState( self ): + bBUG_00000009_IS_STILL_ALIVE = True + if bBUG_00000009_IS_STILL_ALIVE: + logDebug('Querying the current state of jobs') + returnCode = -1 + delayBetweenAttemps = 5 # in seconds + while returnCode != 0: + command = ['qstat', '-f', '-u', '*'] + (returnCode, qstatOutput, stderr) = executeProgram( command ) + if returnCode != 0: + logWarning('command "%s" failed (returnCode = %d, stdout="%s", stderr="%s"). Retrying in %d seconds' % (' '.join(command), returnCode, qstatOutput, stderr, delayBetweenAttemps)) + time.sleep(delayBetweenAttemps) + if bBUG_00000009_IS_STILL_ALIVE: + logDebug('Just got current state of jobs') + + jobsState = QstatParser().parseQstatOutput( qstatOutput ) + jobsState.setTime( time.time() ) + + + # read the requirements for pending jobs (which parallel environment, which queue, which architecture) from sge + if False: # no need for job details at the moment and since it's very slow, it's been disabled + for jobId, job in jobsState.getPendingJobs().iteritems(): + (returnCode, stdout, stderr) = executeProgram( ['qstat', '-j', job.getId().asStr()] ) + assert( returnCode != 0, 'prout' ) + QstatParser().parseJobDetails( stdout, job ) + + return jobsState + + def setQueueInstanceActivation( self, strQueueInstanceName, bEnable ): + argument = 'd' + if bEnable: + argument = 'e' + errorCode, stdout, stderr = executeProgram(['qmod', '-'+argument, strQueueInstanceName]) + return (errorCode == 0) + + def queueIsEmpty( self, strMachineName ): + (returnCode, qstatOutput, stderr) = executeProgram( ['qstat', '-f', '-u', '*'] ) + assert( returnCode == 0 ) + jobsState = QstatParser().parseQstatOutput( qstatOutput ) + jobs = jobsState.getJobsOnMachine( strMachineName ) + return (len(jobs) == 0) + + \ No newline at end of file diff --git a/ClusterController/TestBug00000003.py b/ClusterController/TestBug00000003.py new file mode 100755 index 0000000..1be69e7 --- /dev/null +++ b/ClusterController/TestBug00000003.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +import sys +sys.path.insert(0, '..') +from Log import * +import Util +from PowerState import * + +from HTMLParser import HTMLParser + +def Test0000(): + logInfo('Testing bug 00000003 if a series of wake up, goto sleep can shutdown a machine') + strTargetMachineName = 'simpatix12' + ePowerState = Util.getPowerState(strTargetMachineName) + while True: + if ePowerState == PowerState.ON: + bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName) + assert( bSuccess ) + bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName) + ePowerState = PowerState.SLEEP + elif ePowerState == PowerState.SLEEP: + bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + assert( bSuccess ) + ePowerState = PowerState.ON + else: + assert(False) + +def Test0001(): + logInfo('Testing bug 00000003 : could it be caused by a sleep and a power on at the same tim ?') + strTargetMachineName = 'simpatix12' + ePowerState = Util.getPowerState(strTargetMachineName) + if ePowerState == PowerState.SLEEP: + bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + assert( bSuccess ) + ePowerState = PowerState.ON + assert(ePowerState == PowerState.ON) + Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName ) + bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + assert(bSuccess) + +def Test0002(): + logInfo('Testing bug 00000003 : could it be caused by a power on quickly followed by a sleep ?') + strTargetMachineName = 'simpatix12' + ePowerState = Util.getPowerState(strTargetMachineName) + if ePowerState == PowerState.ON: + bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + assert( bSuccess ) + ePowerState = PowerState.SLEEP + assert(ePowerState == PowerState.SLEEP) + Util.executeIpmiCommand( strTargetMachineName, 'chassis power on' ) + Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName ) + +if __name__ == '__main__': + Test0000() \ No newline at end of file diff --git a/ClusterController/Util.py b/ClusterController/Util.py new file mode 100644 index 0000000..951f1c3 --- /dev/null +++ b/ClusterController/Util.py @@ -0,0 +1,214 @@ +import Lib.Util +import Lib.SimpaDbUtil +from Log import * +from PowerState import * +import re +import StringIO +import os +import traceback +import sys + +def executeProgram( astrArguments ): + bBUG_00000008_IS_STILL_ACTIVE = True + if bBUG_00000008_IS_STILL_ACTIVE: + logDebug('executeProgram : program = [%s]' % (','.join(astrArguments) )) + (returnCode, stdout, stderr) = Lib.Util.executeProgram( astrArguments ) + if bBUG_00000008_IS_STILL_ACTIVE: + logDebug('executeCommand : return code of [%s] = %d' % (','.join(astrArguments), returnCode)) + return (returnCode, stdout, stderr) + +def executeCommand( command ): + #logDebug('executeCommand : command = ' + command) + (returnCode, stdout, stderr) = Lib.Util.executeCommand( command ) + #logDebug('executeCommand : return code of "'+command+'" = '+str(returnCode)) + return (returnCode, stdout, stderr) + +def executeIpmiCommand( machineName, ipmiCommandArgs ): + lomIpAddress = Lib.SimpaDbUtil.getLightOutManagementIpAddress( machineName ) + lomPasswordFilepath = '/usr/local/etc/LightOutManagementPassword.txt' + astrProgram = ['ipmitool', '-U', 'admin', '-H', lomIpAddress, '-f', lomPasswordFilepath] + astrProgram.extend( ipmiCommandArgs ) + #print 'executeIpmiCommand' + #print astrProgram + bBUG_00000005_IS_STILL_ACTIVE = True + if bBUG_00000005_IS_STILL_ACTIVE: + # bug 00000005 causes ipmitool to randomly fail for no apparent reason (two consecutive calls ight give different errors, these errors not always being timeouts). Therefore we try and try again, until the command succeeds. If we don't do this, cluster controller keeps stopping because ipmi commands fail. The effect of this hack is that the UNPLUGGED power state is no longer detected; therefore, with this hack, cluster controller is expecting all machines to be plugged. + bCommandSucceeded = False + while not bCommandSucceeded: + (returnCode, stdout, stderr) = executeProgram( astrProgram ) + if returnCode == 0: + bCommandSucceeded = True + else: + logWarning('the command "%s" failed. Retrying a bit later' % ' '.join(astrProgram)) + time.sleep(5) # wait for 5 seconds before the next attempt, in order not to saturate activity + else: + (returnCode, stdout, stderr) = executeProgram( astrProgram ) + """ + sh-3.2# ipmitool -U admin -H 129.20.27.220 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' + Unabled to establish a session with the BMC. + Command failed due to insufficient resources for session (0xFFFEF901) + -> this error means that the number of active conections to the BMC has reached the maximum (usually 5). + + sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' + Unabled to establish a session with the BMC. + Command failed due to Unknown (0xFFFEF923) (0xFFFEF923) + + sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' + Unabled to establish a session with the BMC. + Command failed due to Timeout (0xFFFEF9C3) + """ + + return (returnCode, stdout, stderr) + +def getPowerState( machineName ): + ePowerState = PowerState.UNKNOWN + bPowerStateRead = False + iNumFailedAttempts = 0 + while not bPowerStateRead: + (returnCode, stdout, stderr) = executeIpmiCommand( machineName, ['sensor', 'get', 'ACPI State'] ) + if returnCode == 0: + matchObj = re.search('\[(?PS[0-9][^\:]*)\:', stdout) + bBUG_00000002_IS_STILL_ACTIVE = True + if bBUG_00000002_IS_STILL_ACTIVE: + if matchObj == None: + # the following warning has been commented out because it pollutes the logs and apparently + # it's a 'feature' of all 4 core machines : if the machine is woken up using ipmitool, then + # no power on event is logged ... + #logWarning('degenerate ipmitool output for machine %s (see bug 00000002). Assuming power in on because that''s what I noticed when I had the case.' % machineName) + return PowerState.ON + else: + assert( matchObj ) + strAcpiState = matchObj.group('AcpiState') + if strAcpiState == 'S0/G0': + ePowerState = PowerState.ON + elif strAcpiState == 'S3': # memory is still powered + ePowerState = PowerState.SLEEP + elif strAcpiState == 'S5/G2': # soft-off + ePowerState = PowerState.OFF + else: + print strAcpiState + assert( False ) + bPowerStateRead = True + else: + # error ... it's either because the machine is unplugged or because the machine is busy (well I'm not sure what happened but I had the case where the command failed for no apparent reason, and therefore I suspect it to be busy ). In order to differentiate these 2 cases, we try again and if this caommand fails too many times then we decide it's unplugged (very dodgy I know but I'm disapointed that this command doen't always work, and for now I don't know other ways to differentiate between these cases....) + iMAX_NUM_ATTEMPTS=5 + iNumFailedAttempts += 1 + if iNumFailedAttempts < iMAX_NUM_ATTEMPTS: + logWarning('failed to read the power state of %s. I\'ll try a again a bit later....' % machineName) + time.sleep(5) + else: + logWarning('failed to read the power state of %s too many times. I assume this machine is unplugged' % machineName) + ePowerState = PowerState.UNPLUGGED # too many attempts failed ... I guess it's because the machine is unplugged + bPowerStateRead = True + return ePowerState + +def wakeUpMachine( machineName ): + """ + this method seems more reliable than wake on lan (sometimes, sending wake on lan packet seems to have no effect) + @return true on success, false otherwise + @note I once had this method failing for no obvious reason.. maybe this command does not succeed if the machine is in a transition state + """ + (returnCode, stdout, stderr) = executeIpmiCommand( machineName, ['chassis', 'power', 'on'] ) + bSuccess = (returnCode == 0) # this command can fail if the machine is manually unplugged for example + return bSuccess + +def blockingPutMachineToSleep( machineName ): + """ + @return true on success, false otherwise + """ + logInfo('putting machine %s to sleep...' % machineName) + iMaxNumAttempts = 5 + bSuccess = False + bBUG_00000010_IS_STILL_ALIVE = True + iAttempt = 0 + # note : each sleep order is not actually succeeding (god knows why). Therefore, we need to try again and again. + while not bSuccess: + # note : pmset must be executed as root + (returnCode, stdout, stderr) = executeProgram(['ssh', machineName, 'pmset sleepnow']) + # check if the machine actually went to sleep + iMaxGoToSleepDuration = 30 # in seconds + iDelay = 0 + while iDelay < iMaxGoToSleepDuration: + time.sleep(5) + iDelay += 5 + ePowerState = getPowerState( machineName ) + if ePowerState == PowerState.SLEEP: + logInfo('machine %s is now sleeping (put to sleep succeeded)' % machineName) + return True + else: + if ePowerState != PowerState.ON: + logWarning('unexpectedly, powerState of %s is %s' % (machineName, PowerStateToStr(ePowerState))) + assert(ePowerState == PowerState.ON) + iAttempt += 1 + if iAttempt > iMaxNumAttempts: + if bBUG_00000010_IS_STILL_ALIVE: + logWarning('the attempt to put %s to sleep failed to many times (probably because of bug 00000010 (machine is in a weird state : power on but no ssh possible) ?)... giving up. ' % (machineName)) + return False + else: + logWarning('the attempt to put %s to sleep failed to many times... giving up' % (machineName)) + return False + else: + logWarning('the attempt to put %s to sleep failed... trying again' % (machineName)) + return True + +def blockingWakeUpMachine(machineName): + logInfo('waking up machine %s...' % machineName) + numAttempts = 0 + bWakeUpFailed = True + while bWakeUpFailed: # try more than once because sometimes for an unknown reason, the wake up order is ignored by the machine ... to be investigated + iMaxNumWakeUpAttempts = 50 + iNumWakeUpAttempts = 0 + bWakeUpMachineSucceeded = False + while not bWakeUpMachineSucceeded: + bWakeUpMachineSucceeded = wakeUpMachine( machineName ) + iNumWakeUpAttempts += 1 + # the previous command can fail if the machine is already in a transition + # in that case we try sevral times bevire giving up + if(bWakeUpMachineSucceeded == False): + if iNumWakeUpAttempts < iMaxNumWakeUpAttempts: + iDelay = 5 + logWarning('wake up attempt %d of %s failed... I\'ll try again in %d seconds' % (iNumWakeUpAttempts, machineName, iDelay)) + time.sleep(iDelay) + else: + logWarning('wake up attempt %d of %s failed too many times... giving up' % (iNumWakeUpAttempts, machineName)) + return False # couldn't wake up to machine for whatever reason + + bWakeUpFailed = False + # wait until the machine is operational + WAKEUPTIMEOUT=5*60 # max number of seconds allowed for a machine to be alive after a wakeup request + wakeUpToAliveDuration = 0 + while not Lib.SimpaDbUtil.isMachineResponding( machineName ): + time.sleep(5) + wakeUpToAliveDuration+=5 + if wakeUpToAliveDuration > WAKEUPTIMEOUT: + # the wake up failed for whatever reason (power state changed manually ? wake up order got lost ?) + logWarning('%s took too long (more than %d seconds) to respond after a successful wakeup request.' % (machineName, WAKEUPTIMEOUT)) + bWakeUpFailed = True + break + if bWakeUpFailed: + numAttempts+=1 + if numAttempts >= 2: + logWarning('giving up waking up %s because the wake up request succeeded but the machine never actually came alive (and this too many times)' % (machineName)) + return False # power state changed manually ? + else: + logWarning('attempting to wake up %s one more time' % (machineName)) + else: + # wake up completed + logInfo('Waking up of machine %s completed successfully' % machineName) + return True + +def onException(exception): + sys.stdout.flush() + strExceptionType = type( exception ) + strMessage = 'exception %s : %s\n' % (strExceptionType, exception.message) + #traceback.print_last() + f = StringIO.StringIO() + traceback.print_exc(file=f) + strMessage += f.getvalue() + f.close() + logError(strMessage) + Lib.Util.sendTextMail( 'ClusterController ', 'guillaume.raffy@univ-rennes1.fr', 'ClusterController has stopped because of an exception', strMessage) + executeCommand('kill -9 %d' % os.getpid()) # stop other threads immediately + exit() + + \ No newline at end of file diff --git a/ClusterController/__init__.py b/ClusterController/__init__.py new file mode 100644 index 0000000..5a9ecee --- /dev/null +++ b/ClusterController/__init__.py @@ -0,0 +1 @@ +# this file is here just so that the containing directory is treated as a python package \ No newline at end of file