189 lines
6.5 KiB
Python
189 lines
6.5 KiB
Python
import threading
|
|
import time
|
|
import Lib.Util
|
|
import Lib.SimpaDbUtil
|
|
import os
|
|
import traceback
|
|
import sys
|
|
from PowerState import *
|
|
from QstatParser import *
|
|
import Util
|
|
|
|
class IWakeUpCompleteNotifier:
|
|
"""
|
|
interface for wakeup notifiers
|
|
"""
|
|
def onWakeUpComplete( self ):
|
|
assert( False )
|
|
|
|
class ISleepCompleteNotifier:
|
|
"""
|
|
interface for sleep notifiers
|
|
"""
|
|
def onSleepComplete( self, bSleepSucceeded ):
|
|
assert( False )
|
|
|
|
class IRequest:
|
|
GO_TO_SLEEP = 1
|
|
WAKE_UP = 2
|
|
CHECK_POWER_STATE = 3
|
|
|
|
def __init__( self, requestType ):
|
|
self.m_type = requestType
|
|
|
|
def getType( self ):
|
|
return self.m_type
|
|
|
|
def process( self, clusterNodeStatusUpdater ):
|
|
"""
|
|
processes this request
|
|
"""
|
|
assert( False ) # this method is abstract
|
|
|
|
class WakeUpRequest( IRequest ):
|
|
|
|
def __init__( self, wakeUpNotifier ):
|
|
IRequest.__init__( self, IRequest.WAKE_UP )
|
|
self.m_wakeUpNotifier = wakeUpNotifier
|
|
|
|
def process( self, clusterNodeStatusUpdater ):
|
|
assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to wake up a machine that should always be on ?
|
|
logInfo('Handling wakeup request for %s' % clusterNodeStatusUpdater.getName() )
|
|
bSuccess = blockingWakeUpMachine( clusterNodeStatusUpdater.getName() )
|
|
assert( bSuccess )
|
|
# activate the associated machine queue
|
|
if clusterNodeStatusUpdater.setQueueActivation( True ):
|
|
None # all is ok
|
|
else:
|
|
assert( False )
|
|
clusterNodeStatusUpdater.m_stateLock.acquire()
|
|
clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.ON )
|
|
clusterNodeStatusUpdater.m_stateLock.release()
|
|
if self.m_wakeUpNotifier:
|
|
logDebug('ClusterNodeStatusUpdater::run : Sending wakeup notification')
|
|
self.m_wakeUpNotifier.onWakeUpComplete()
|
|
|
|
class SleepRequest( IRequest ):
|
|
|
|
def __init__( self, sleepCompleteNotifier ):
|
|
IRequest.__init__( self, IRequest.GO_TO_SLEEP )
|
|
self.m_sleepCompleteNotifier = sleepCompleteNotifier
|
|
|
|
def process( self, clusterNodeStatusUpdater ):
|
|
assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to put a machine the should stay on to sleep ?
|
|
logInfo('Handling sleep request for %s' % clusterNodeStatusUpdater.getName() )
|
|
if clusterNodeStatusUpdater.setQueueActivation( False ):
|
|
if clusterNodeStatusUpdater.queueIsEmpty():
|
|
if blockingPutMachineToSleep( clusterNodeStatusUpdater.m_clusterNodeName ):
|
|
# now we know that the machine is asleep
|
|
clusterNodeStatusUpdater.m_stateLock.acquire()
|
|
clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.SLEEP )
|
|
clusterNodeStatusUpdater.m_stateLock.release()
|
|
if self.m_sleepCompleteNotifier:
|
|
self.m_sleepCompleteNotifier.onSleepComplete( True )
|
|
else:
|
|
assert( False )
|
|
else:
|
|
# reactivate the queue
|
|
if not clusterNodeStatusUpdater.setQueueActivation( True ):
|
|
assert( False )
|
|
clusterNodeStatusUpdater.m_stateLock.acquire()
|
|
clusterNodeStatusUpdater.m_clusterNode.setPowerState( PowerState.ON ) # this is necessary to reenable the various cyclic checks that were disabled on sleep request
|
|
clusterNodeStatusUpdater.m_stateLock.release()
|
|
clusterNodeStatusUpdater.m_clusterNode.onSleepFailedBecauseAJobJustArrived()
|
|
if self.m_sleepCompleteNotifier:
|
|
self.m_sleepCompleteNotifier.onSleepComplete( False )
|
|
else:
|
|
assert( False )
|
|
|
|
class CheckPowerStateRequest( IRequest ):
|
|
|
|
def __init__( self ):
|
|
IRequest.__init__( self, IRequest.CHECK_POWER_STATE )
|
|
|
|
def process( self, clusterNodeStatusUpdater ):
|
|
powerState = Util.getPowerState( clusterNodeStatusUpdater.m_clusterNodeName )
|
|
clusterNodeStatusUpdater.m_stateLock.acquire()
|
|
clusterNodeStatusUpdater.m_clusterNode.onNewPowerStateReading( powerState )
|
|
clusterNodeStatusUpdater.m_lastPowerStateCheckTime = time.time()
|
|
clusterNodeStatusUpdater.m_stateLock.release()
|
|
|
|
class ClusterNodeStatusUpdater( threading.Thread ):
|
|
DELAY_BETWEEN_POWERSTATE_CHECKS=5*60 # in seconds
|
|
|
|
def __init__( self, machineName, clusterNode, gridEngine ):
|
|
threading.Thread.__init__(self)
|
|
self.m_clusterNodeName = machineName
|
|
self.m_clusterNode = clusterNode
|
|
self.m_gridEngine = gridEngine
|
|
self.m_bStop = False
|
|
self.m_lastPowerStateCheckTime = None #time.time()
|
|
self.m_bCheckPowerState = True
|
|
self.m_stateLock = threading.Lock() # lock that prevents concurrent access to the state of this instance
|
|
self.m_bShouldAlwaysBeOn = False # indicates that the machine should never go to sleep or off for whatever reason (eg simpatix10)
|
|
self.m_pendingRequestsQueue = []
|
|
|
|
def getGridEngine( self ):
|
|
return self.m_gridEngine
|
|
|
|
def getName( self ):
|
|
return self.m_clusterNodeName
|
|
|
|
def setShouldAlwaysBeOn( self ):
|
|
print('%s should always be on' % (self.getName()) )
|
|
self.m_bShouldAlwaysBeOn = True
|
|
|
|
def pushRequest( self, request ):
|
|
self.m_stateLock.acquire()
|
|
self.m_pendingRequestsQueue.append(request)
|
|
self.m_stateLock.release()
|
|
|
|
def popRequest( self ):
|
|
oldestRequest = None
|
|
self.m_stateLock.acquire()
|
|
if len(self.m_pendingRequestsQueue) != 0:
|
|
oldestRequest = self.m_pendingRequestsQueue.pop(0)
|
|
self.m_stateLock.release()
|
|
return oldestRequest
|
|
|
|
def run( self ):
|
|
try:
|
|
|
|
while not self.m_bStop :
|
|
# handle the oldest request
|
|
request = self.popRequest()
|
|
if request != None :
|
|
request.process( self )
|
|
|
|
# schedule a power state check if required
|
|
currentTime = time.time()
|
|
if self.m_bCheckPowerState:
|
|
if not self.m_bShouldAlwaysBeOn: # don't do power checks on such machines because some current implementations of
|
|
# operations involved might cause the machine to go to sleep
|
|
if (not self.m_lastPowerStateCheckTime) or (currentTime > (self.m_lastPowerStateCheckTime + ClusterNodeStatusUpdater.DELAY_BETWEEN_POWERSTATE_CHECKS)):
|
|
self.pushRequest( CheckPowerStateRequest() )
|
|
|
|
time.sleep(1)
|
|
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
|
|
Util.onException(exception)
|
|
|
|
def requestSleep( self, sleepCompleteNotifier = None ):
|
|
assert( self.m_bShouldAlwaysBeOn == False )
|
|
self.pushRequest( SleepRequest( sleepCompleteNotifier ) )
|
|
|
|
def requestWakeUp( self, wakeUpNotifier = None ):
|
|
assert( self.m_bShouldAlwaysBeOn == False )
|
|
self.pushRequest( WakeUpRequest( wakeUpNotifier ) )
|
|
|
|
def getQueueMachineName( self ):
|
|
return self.m_clusterNode.getQueueMachineName()
|
|
|
|
def setQueueActivation( self, bEnable ):
|
|
"""
|
|
@return true on success, false otherwise
|
|
"""
|
|
return self.getGridEngine().setQueueInstanceActivation( self.getQueueMachineName(), bEnable )
|
|
|
|
def queueIsEmpty( self ):
|
|
return self.getGridEngine().queueIsEmpty( self.getName() )
|