import threading import time import Lib.Util import Lib.SimpaDbUtil import os import traceback import sys from PowerState import * from QstatParser import * import Util class IWakeUpCompleteNotifier: """ interface for wakeup notifiers """ def onWakeUpComplete( self ): assert( False ) class ISleepCompleteNotifier: """ interface for sleep notifiers """ def onSleepComplete( self, bSleepSucceeded ): assert( False ) class IRequest: GO_TO_SLEEP = 1 WAKE_UP = 2 CHECK_POWER_STATE = 3 def __init__( self, requestType ): self.m_type = requestType def getType( self ): return self.m_type def process( self, clusterNodeStatusUpdater ): """ processes this request """ assert( False ) # this method is abstract class WakeUpRequest( IRequest ): def __init__( self, wakeUpNotifier ): IRequest.__init__( self, IRequest.WAKE_UP ) self.m_wakeUpNotifier = wakeUpNotifier def process( self, clusterNodeStatusUpdater ): assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to wake up a machine that should always be on ? logInfo('Handling wakeup request for %s' % clusterNodeStatusUpdater.getName() ) bSuccess = blockingWakeUpMachine( clusterNodeStatusUpdater.getName() ) assert( bSuccess ) # activate the associated machine queue if clusterNodeStatusUpdater.setQueueActivation( True ): None # all is ok else: assert( False ) clusterNodeStatusUpdater.m_stateLock.acquire() clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.ON ) clusterNodeStatusUpdater.m_stateLock.release() if self.m_wakeUpNotifier: logDebug('ClusterNodeStatusUpdater::run : Sending wakeup notification') self.m_wakeUpNotifier.onWakeUpComplete() class SleepRequest( IRequest ): def __init__( self, sleepCompleteNotifier ): IRequest.__init__( self, IRequest.GO_TO_SLEEP ) self.m_sleepCompleteNotifier = sleepCompleteNotifier def process( self, clusterNodeStatusUpdater ): assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to put a machine the should stay on to sleep ? logInfo('Handling sleep request for %s' % clusterNodeStatusUpdater.getName() ) if clusterNodeStatusUpdater.setQueueActivation( False ): if clusterNodeStatusUpdater.queueIsEmpty(): if blockingPutMachineToSleep( clusterNodeStatusUpdater.m_clusterNodeName ): # now we know that the machine is asleep clusterNodeStatusUpdater.m_stateLock.acquire() clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.SLEEP ) clusterNodeStatusUpdater.m_stateLock.release() if self.m_sleepCompleteNotifier: self.m_sleepCompleteNotifier.onSleepComplete( True ) else: assert( False ) else: # reactivate the queue if not clusterNodeStatusUpdater.setQueueActivation( True ): assert( False ) clusterNodeStatusUpdater.m_stateLock.acquire() clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.ON ) # this is necessary to reenable the various cyclic checks that were disabled on sleep request clusterNodeStatusUpdater.m_stateLock.release() clusterNodeStatusUpdater.m_clusterNode.onSleepFailedBecauseAJobJustArrived() if self.m_sleepCompleteNotifier: self.m_sleepCompleteNotifier.onSleepComplete( False ) else: assert( False ) class CheckPowerStateRequest( IRequest ): def __init__( self ): IRequest.__init__( self, IRequest.CHECK_POWER_STATE ) def process( self, clusterNodeStatusUpdater ): powerState = Util.getPowerState( clusterNodeStatusUpdater.m_clusterNodeName ) clusterNodeStatusUpdater.m_stateLock.acquire() clusterNodeStatusUpdater.m_clusterNode.onNewPowerStateReading( powerState ) clusterNodeStatusUpdater.m_lastPowerStateCheckTime = time.time() clusterNodeStatusUpdater.m_stateLock.release() class ClusterNodeStatusUpdater( threading.Thread ): DELAY_BETWEEN_POWERSTATE_CHECKS=5*60 # in seconds def __init__( self, machineName, clusterNode, gridEngine ): threading.Thread.__init__(self) self.m_clusterNodeName = machineName self.m_clusterNode = clusterNode self.m_gridEngine = gridEngine self.m_bStop = False self.m_lastPowerStateCheckTime = None #time.time() self.m_bCheckPowerState = True self.m_stateLock = threading.Lock() # lock that prevents concurrent access to the state of this instance self.m_bShouldAlwaysBeOn = False # indicates that the machine should never go to sleep or off for whatever reason (eg simpatix10) self.m_pendingRequestsQueue = [] def getGridEngine( self ): return self.m_gridEngine def getName( self ): return self.m_clusterNodeName def setShouldAlwaysBeOn( self ): print('%s should always be on' % (self.getName()) ) self.m_bShouldAlwaysBeOn = True def pushRequest( self, request ): self.m_stateLock.acquire() self.m_pendingRequestsQueue.append(request) self.m_stateLock.release() def popRequest( self ): oldestRequest = None self.m_stateLock.acquire() if len(self.m_pendingRequestsQueue) != 0: oldestRequest = self.m_pendingRequestsQueue.pop(0) self.m_stateLock.release() return oldestRequest def run( self ): try: while not self.m_bStop : # handle the oldest request request = self.popRequest() if request != None : request.process( self ) # schedule a power state check if required currentTime = time.time() if self.m_bCheckPowerState: if not self.m_bShouldAlwaysBeOn: # don't do power checks on such machines because some current implementations of # operations involved might cause the machine to go to sleep if (not self.m_lastPowerStateCheckTime) or (currentTime > (self.m_lastPowerStateCheckTime + ClusterNodeStatusUpdater.DELAY_BETWEEN_POWERSTATE_CHECKS)): self.pushRequest( CheckPowerStateRequest() ) time.sleep(1) except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) Util.onException(exception) def requestSleep( self, sleepCompleteNotifier = None ): assert( self.m_bShouldAlwaysBeOn == False ) self.pushRequest( SleepRequest( sleepCompleteNotifier ) ) def requestWakeUp( self, wakeUpNotifier = None ): assert( self.m_bShouldAlwaysBeOn == False ) self.pushRequest( WakeUpRequest( wakeUpNotifier ) ) def getQueueMachineName( self ): return self.m_clusterNode.getQueueMachineName() def setQueueActivation( self, bEnable ): """ @return true on success, false otherwise """ return self.getGridEngine().setQueueInstanceActivation( self.getQueueMachineName(), bEnable ) def queueIsEmpty( self ): return self.getGridEngine().queueIsEmpty( self.getName() )