fixed styling errors and added more type hinting to increase maintainability of cocluto

- all styling errors are now fixed, but there are still warnings and information
- most functions have been converted to snake case
- most functions now have type hinting

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3873]
This commit is contained in:
Guillaume Raffy 2024-06-14 15:52:32 +02:00
parent 6bf69f909b
commit 9f4a80b11e
24 changed files with 1037 additions and 951 deletions

View File

@ -1,21 +1,26 @@
#!/usr/bin/env python
from typing import Dict, Optional
import sys
sys.path.insert(0, '..')
import os
import MySQLdb
import threading
from Lib.Util import *
from Lib.SimpaDbUtil import *
import time
from ClusterStatus import ClusterStatus
from SlotAllocator import DecoupledSlotAllocator
from Log import logDebug, logInfo
from ClusterNodeStatusUpdater import IWakeUpCompleteNotifier, ISleepCompleteNotifier
from SunGridEngine import SunGridEngine
from Util import log, onException
from WebServer import WebServerThread
from PowerState import PowerState
from datetime import datetime
if sys.version_info < (3, 0):
from HTMLParser import HTMLParser
else:
from html.parser import HTMLParser
from ..Util import log
# from ..SimpaDbUtil import toto
from .ClusterNode import ClusterNodeId, ClusterNode
from .ClusterStatus import ClusterStatus
from .SlotAllocator import DecoupledSlotAllocator, SlotAllocator
from .Log import logDebug, log_info
from .ClusterNodeStatusUpdater import IWakeUpCompleteNotifier, ISleepCompleteNotifier
from .SunGridEngine import SunGridEngine
from .Util import on_exception
from .WebServer import WebServerThread
from .PowerState import PowerState
VERSION = '1.18'
@ -23,38 +28,38 @@ VERSION = '1.18'
class MyHTMLParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.TokenList = []
self.token_list = []
def handle_data(self, data):
data = data.strip()
if data and len(data) > 0:
self.TokenList.append(data)
self.token_list.append(data)
# print data
def GetTokenList(self):
return self.TokenList
def get_token_list(self):
return self.token_list
class WakeUpCompleteNotifier(IWakeUpCompleteNotifier):
def __init__(self, machineName, clusterController):
self.m_machineName = machineName
self.m_clusterController = clusterController
self.machine_name = machineName
self.cluster_controller = clusterController
def onWakeUpComplete(self):
logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start')
self.m_clusterController.onMachineWakeUpComplete(self.m_machineName)
def on_wake_up_complete(self):
logDebug('WakeUpCompleteNotifier::on_wake_up_complete : start')
self.cluster_controller.onMachineWakeUpComplete(self.machine_name)
class SleepCompleteNotifier(ISleepCompleteNotifier):
def __init__(self, machineName, clusterController):
self.m_machineName = machineName
self.m_clusterController = clusterController
self.machine_name = machineName
self.cluster_controller = clusterController
def onSleepComplete(self, bSleepSucceeded):
logDebug('SleepCompleteNotifier::onSleepComplete : start')
self.m_clusterController.onMachineSleepComplete(self.m_machineName, bSleepSucceeded)
def on_sleep_complete(self, bSleepSucceeded):
logDebug('SleepCompleteNotifier::on_sleep_complete : start')
self.cluster_controller.onMachineSleepComplete(self.machine_name, bSleepSucceeded)
def jouleToKwh(fEnergyInJoules):
@ -79,23 +84,36 @@ class ClusterController:
jobs (eg add some machines to a queue).
Mechanism to let user get priority
"""
cluster_status: ClusterStatus
slot_allocator = SlotAllocator
machines_that_need_wake_up: Dict[ClusterNodeId, ClusterNode]
machines_that_need_wake_up_lock: threading.Lock # to prevent concurrent access to machines_that_need_wake_up
machines_that_need_sleeping: Dict[ClusterNodeId, ClusterNode]
machines_that_need_sleeping_lock: threading.Lock # to prevent concurrent access to machines_that_need_sleeping
last_energy_status_log_time: Optional[datetime]
DELAY_BETWEEN_ENERGY_STATUS_LOGS: int # in seconds
session_id: Optional[int] # session (run) identifier in database
web_server: WebServerThread
stop: bool
stop_lock: threading.Lock # to prevent concurrent access to stop
def __init__(self):
gridEngine = SunGridEngine()
self.m_clusterStatus = ClusterStatus(gridEngine)
self.m_slotAllocator = DecoupledSlotAllocator() # SimpleSlotAllocator()
self.m_machinesThatNeedWakeUp = {}
self.m_machinesThatNeedWakeupLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedWakeUp
self.m_machinesThatNeedSleeping = {}
self.m_machinesThatNeedSleepingLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedSleeping
self.m_lastEnergyStatusLogTime = None
self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds
self.m_iSessionId = None # session (run) identifier in database
self.m_webServer = WebServerThread(self)
self.m_bStop = False
self.m_bStopLock = threading.Lock() # to prevent concurrent access to m_bStop
self.cluster_status = ClusterStatus(gridEngine)
self.slot_allocator = DecoupledSlotAllocator() # SimpleSlotAllocator() pylint: disable=no-value-for-parameter
self.machines_that_need_wake_up = {}
self.machines_that_need_wake_up_lock = threading.Lock()
self.machines_that_need_sleeping = {}
self.machines_that_need_sleeping_lock = threading.Lock()
self.last_energy_status_log_time = None
self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60
self.session_id = None
self.web_server = WebServerThread(self)
self.stop = False
self.stop_lock = threading.Lock()
def getClusterStatus(self):
return self.m_clusterStatus
return self.cluster_status
def log(self, message):
print(message)
@ -104,79 +122,76 @@ class ClusterController:
self.log("ClusterController::shutdownLeastImportantNode : start")
def onMachineWakeUpComplete(self, machineName):
self.m_machinesThatNeedWakeupLock.acquire()
# logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)))
del self.m_machinesThatNeedWakeUp[machineName]
# logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)))
self.m_machinesThatNeedWakeupLock.release()
self.machines_that_need_wake_up_lock.acquire()
# logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up)))
del self.machines_that_need_wake_up[machineName]
# logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up)))
self.machines_that_need_wake_up_lock.release()
logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName)
def onMachineSleepComplete(self, machineName, bSleepSucceeded):
self.m_machinesThatNeedSleepingLock.acquire()
# logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)))
del self.m_machinesThatNeedSleeping[machineName]
# logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp)))
self.m_machinesThatNeedSleepingLock.release()
self.machines_that_need_sleeping_lock.acquire()
# logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up)))
del self.machines_that_need_sleeping[machineName]
# logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up)))
self.machines_that_need_sleeping_lock.release()
if bSleepSucceeded:
logDebug('ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it\'s now sleeping' % machineName)
else:
logDebug('ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it can\'t be put to sleep at the moment (eg a job just arrived)' % machineName)
def getNumPendingWakeUps(self):
self.m_machinesThatNeedWakeupLock.acquire()
numPendingWakeUps = len(self.m_machinesThatNeedWakeUp)
self.m_machinesThatNeedWakeupLock.release()
self.machines_that_need_wake_up_lock.acquire()
numPendingWakeUps = len(self.machines_that_need_wake_up)
self.machines_that_need_wake_up_lock.release()
return numPendingWakeUps
def getNumPendingSleeps(self):
self.m_machinesThatNeedSleepingLock.acquire()
numPendingSleeps = len(self.m_machinesThatNeedSleeping)
self.m_machinesThatNeedSleepingLock.release()
self.machines_that_need_sleeping_lock.acquire()
numPendingSleeps = len(self.machines_that_need_sleeping)
self.machines_that_need_sleeping_lock.release()
return numPendingSleeps
def putIdleMachinesToSleep(self):
self.m_clusterStatus.m_lock.acquire()
idleMachines = self.m_clusterStatus.getIdleMachines()
# logInfo('idleMachines :')
self.m_machinesThatNeedToSleep = []
for machineName, idleMachine in idleMachines.items():
if idleMachine.getPowerState() == PowerState.ON:
# logInfo('\t%s' % machineName)
if idleMachine.getName() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things
self.m_machinesThatNeedSleeping[idleMachine.getName()] = idleMachine
self.m_clusterStatus.m_lock.release()
self.cluster_status.lock.acquire()
idleMachines = self.cluster_status.get_idle_machines()
# log_info('idleMachines :')
for _machineName, idleMachine in idleMachines.items():
if idleMachine.get_power_state() == PowerState.ON:
# log_info('\t%s' % machineName)
if idleMachine.get_name() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things
self.machines_that_need_sleeping[idleMachine.get_name()] = idleMachine
self.cluster_status.lock.release()
listOfMachinesThatNeedSleeping = self.m_machinesThatNeedSleeping.values() # duplicate the list so that we don't iterate on m_machinesThatNeedSleeping, which could cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
listOfMachinesThatNeedSleeping = self.machines_that_need_sleeping.values() # duplicate the list so that we don't iterate on machines_that_need_sleeping, which could cause a runtime error because callbacks alter machines_that_need_wake_up
for machine in listOfMachinesThatNeedSleeping:
logInfo('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.getName())
machine.requestSleep(SleepCompleteNotifier(machine.getName(), self))
log_info('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.get_name())
machine.request_sleep(SleepCompleteNotifier(machine.get_name(), self)) # pylint: disable=no-value-for-parameter
if len(listOfMachinesThatNeedSleeping) != 0:
# hack : wait until the sleep requests are handled so that we don't request the same machine to sleep multiple times
while self.getNumPendingSleeps() > 0:
time.sleep(1)
def wakeUpMachinesForPendingJobs(self):
def wake_up_machinesForPendingJobs(self):
listOfMachinesThatNeedWakeUp = []
self.m_clusterStatus.m_lock.acquire()
pendingJobs = self.m_clusterStatus.getPendingJobs()
"""
logInfo('pending jobs :')
for job in pendingJobs.values():
logInfo('\t%d' % job.getId().asStr())
"""
self.cluster_status.lock.acquire()
pendingJobs = self.cluster_status.get_pending_jobs()
# log_info('pending jobs :')
# for job in pendingJobs.values():
# log_info('\t%d' % job.getId().asStr())
if len(pendingJobs) != 0:
self.m_machinesThatNeedWakeUp = self.m_slotAllocator.getMachinesThatNeedWakeUp(pendingJobs, self.m_clusterStatus)
if len(self.m_machinesThatNeedWakeUp) == 0:
None
# logInfo('ClusterController::updateNormalState : no machine needs waking up')
self.machines_that_need_wake_up = self.slot_allocator.get_machinesThatNeedWakeUp(pendingJobs, self.cluster_status)
if len(self.machines_that_need_wake_up) == 0:
pass
# log_info('ClusterController::updateNormalState : no machine needs waking up')
else:
listOfMachinesThatNeedWakeUp = self.m_machinesThatNeedWakeUp.values() # duplicate the list so that we don't iterate on m_machinesThatNeedWakeUp, which would cause a runtime error because callbacks alter m_machinesThatNeedWakeUp
listOfMachinesThatNeedWakeUp = self.machines_that_need_wake_up.values() # duplicate the list so that we don't iterate on machines_that_need_wake_up, which would cause a runtime error because callbacks alter machines_that_need_wake_up
for machine in listOfMachinesThatNeedWakeUp:
logInfo('ClusterController::wakeUpMachinesForPendingJobs : requesting wake up for ' + machine.getName())
machine.requestWakeUp(WakeUpCompleteNotifier(machine.getName(), self))
self.m_clusterStatus.m_lock.release()
log_info('ClusterController::wake_up_machinesForPendingJobs : requesting wake up for ' + machine.get_name())
machine.request_wake_up(WakeUpCompleteNotifier(machine.get_name(), self)) # pylint: disable=no-value-for-parameter
self.cluster_status.lock.release()
if len(listOfMachinesThatNeedWakeUp) != 0:
# hack : wait until the wakeup requests are handled so that a later sleep request doesn't cancel it
@ -184,16 +199,16 @@ class ClusterController:
while self.getNumPendingWakeUps() > 0:
time.sleep(1)
iSGE_CHEK_RUNNABLE_JOBS_DELAY = 60 * 5 # max time it takes for sge between the fact that a queued job is runnable and SGE actually starting it (I've put a long time here because sometimes, qstat takes a long time to ralise that the machine is available after I wake it up)
logInfo('ClusterController::wakeUpMachinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY)
log_info('ClusterController::wake_up_machinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY)
# wait until SGE has a chance to allocate slots
time.sleep(iSGE_CHEK_RUNNABLE_JOBS_DELAY) # note : this is annoying because it blocks the main thread. This could be improved if we forbid the machines to go to sleep for that much time....
logInfo('ClusterController::wakeUpMachinesForPendingJobs : end of the delay given to SGE to allocate slots')
log_info('ClusterController::wake_up_machinesForPendingJobs : end of the delay given to SGE to allocate slots')
def updateNormalState(self):
# attempt to shut down machines that are idle
self.putIdleMachinesToSleep()
# wake up necessary machines if there are pending jobs
self.wakeUpMachinesForPendingJobs()
self.wake_up_machinesForPendingJobs()
def storeSessionInDatabase(self):
conn = MySQLdb.connect('simpatix10', 'clusterctrl', '', 'clustercontroller')
@ -207,7 +222,7 @@ class ClusterController:
iSessionId = r.fetch_row()[0][0]
# stores information about the session
sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.m_clusterStatus.m_clusterNodes))
sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.cluster_status.cluster_nodes))
print(sqlCommand)
conn.query(sqlCommand)
@ -225,64 +240,62 @@ class ClusterController:
assert conn
# update energy savings for the current session
sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % (jouleToKwh(self.m_clusterStatus.getEnergySavings()), self.m_iSessionId)
sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % (jouleToKwh(self.cluster_status.get_energy_savings()), self.session_id)
print(sqlCommand)
conn.query(sqlCommand)
# update the end time of the current session
sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.m_iSessionId)
sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.session_id)
print(sqlCommand)
conn.query(sqlCommand)
conn.close()
def setControlOnMachine(self, machineName, bControl):
def set_control_on_machine(self, machineName, bControl):
"""
adds or removes the control of ClusterController on the given machine
"""
self.m_clusterStatus.setControlOnMachine(machineName, bControl)
self.cluster_status.set_control_on_machine(machineName, bControl)
def run(self):
"""
"""
self.m_iSessionId = self.storeSessionInDatabase()
self.session_id = self.storeSessionInDatabase()
log("storeSessionInDatabase completed")
DELAY_BETWEEN_MEASURES = 10 # in seconds
self.m_clusterStatus.startReadingThreads()
self.m_webServer.start()
while not self.m_clusterStatus.isReady():
self.cluster_status.start_reading_threads()
self.web_server.start()
while not self.cluster_status.is_ready():
log('waiting for system to be ready')
time.sleep(1)
None
logInfo('ClusterController::run : cluster initial readings have completed')
log_info('ClusterController::run : cluster initial readings have completed')
startTime = time.localtime()
while not self.m_bStop:
while not self.stop:
currentTime = time.time()
# clusterStatus.m_nodesStatus['simpatix10'].dump()
if (not self.m_lastEnergyStatusLogTime) or (currentTime > (self.m_lastEnergyStatusLogTime + self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)):
iNumMachines = len(self.m_clusterStatus.m_clusterNodes)
if (not self.last_energy_status_log_time) or (currentTime > (self.last_energy_status_log_time + self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)):
iNumMachines = len(self.cluster_status.cluster_nodes)
iNumMachinesOn = 0
iNumSleepingMachines = 0
for machine in self.m_clusterStatus.m_clusterNodes.values():
ePowerState = machine.getPowerState()
for machine in self.cluster_status.cluster_nodes.values():
ePowerState = machine.get_power_state()
if ePowerState == PowerState.ON:
iNumMachinesOn += 1
elif ePowerState == PowerState.SLEEP:
iNumSleepingMachines += 1
logInfo('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines))
iNumSlots = self.m_clusterStatus.getNumControlledSlots()
iNumUsedSlots = self.m_clusterStatus.getNumUsedSlots()
iNumWastedSlots = self.m_clusterStatus.getNumWastedSlots()
iNumSleepingSlots = self.m_clusterStatus.getNumSleepingSlots()
logInfo('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots))
logInfo('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.m_clusterStatus.getCurrentPowerConsumption(), self.m_clusterStatus.getCurrentPowerSavings()))
logInfo('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.m_clusterStatus.getEnergyConsumption()), jouleToKwh(self.m_clusterStatus.getEnergySavings())))
log_info('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines))
iNumSlots = self.cluster_status.get_num_controlled_slots()
iNumUsedSlots = self.cluster_status.get_num_used_slots()
iNumWastedSlots = self.cluster_status.get_num_wasted_slots()
iNumSleepingSlots = self.cluster_status.get_num_sleeping_slots()
log_info('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots))
log_info('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.cluster_status.get_current_power_consumption(), self.cluster_status.get_current_power_savings()))
log_info('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.cluster_status.get_energy_consumption()), jouleToKwh(self.cluster_status.get_energy_savings())))
self.updateSessionEnergyConsumptionInDatabase()
self.m_lastEnergyStatusLogTime = currentTime
self.last_energy_status_log_time = currentTime
self.updateNormalState()
time.sleep(DELAY_BETWEEN_MEASURES)
self.m_clusterStatus.stopReadingThreads()
self.cluster_status.stop_reading_threads()
def storeClusterNodeStatus(clusterNodeStatus):
@ -290,19 +303,17 @@ def storeClusterNodeStatus(clusterNodeStatus):
conn = MySQLdb.connect('simpatix10', 'root', '', 'simpa_measurements')
assert conn
# conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""")
'''
conn.query("""SELECT * FROM fan_rpm_logs""")
r=conn.store_result()
print r.fetch_row()[0]
'''
for key, sensor in clusterNodeStatus.m_sensors.items():
sensorId = clusterNodeStatus.m_clusterNodeName + '_' + sensor.m_name
# conn.query("""SELECT * FROM fan_rpm_logs""")
# r=conn.store_result()
# print r.fetch_row()[0]
for _key, sensor in clusterNodeStatus.sensors.items():
sensorId = clusterNodeStatus.cluster_node_name + '_' + sensor.name
if sensor.typeName() == 'Fan':
sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.m_rpms) + """, NOW());"""
sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.rpms) + """, NOW());"""
print(sqlCommand)
conn.query(sqlCommand)
elif sensor.typeName() == 'Temperature':
sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.m_temperature) + """, NOW());"""
sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.temperature) + """, NOW());"""
print(sqlCommand)
conn.query(sqlCommand)
else:
@ -311,11 +322,11 @@ def storeClusterNodeStatus(clusterNodeStatus):
if __name__ == '__main__':
# Lib.Util.sendTextMail('SimpaCluster <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content')
# Lib.Util.send_text_mail('SimpaCluster <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content')
try:
logInfo('ClusterController v. %s starting....' % VERSION)
# executeCommand('ping -o -t 1 simpatix310 > /dev/null')
# print executeCommand('ssh simpatix10 "ipmitool sensor"')
log_info('ClusterController v. %s starting....' % VERSION)
# execute_command('ping -o -t 1 simpatix310 > /dev/null')
# print execute_command('ssh simpatix10 "ipmitool sensor"')
# assert False, 'prout'
controller = ClusterController()
controller.run()
@ -323,4 +334,4 @@ if __name__ == '__main__':
# except AssertionError, error:
# except KeyboardInterrupt, error:
except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
onException(exception)
on_exception(exception)

View File

@ -1,142 +1,161 @@
import threading
from PowerState import PowerState, PowerStateToStr
from ClusterNodeStatusUpdater import ClusterNodeStatusUpdater
import Lib.Util
import Lib.SimpaDbUtil
from Log import logInfo, logWarning
from typing import TYPE_CHECKING
from typing import Optional
from .PowerState import PowerState, PowerStateToStr
from .Log import log_info, log_warning
from .ClusterNodeStatusUpdater import ClusterNodeStatusUpdater
if TYPE_CHECKING:
from .ClusterStatus import ClusterStatus
from .SunGridEngine import SunGridEngine
from .ClusterController import SleepCompleteNotifier
from datetime import datetime
ClusterNodeId = str # eg 'physix99'
class ClusterNode:
"""
the state of a machine node
"""
def __init__(self, machineName, cluster, gridEngine):
self.m_name = machineName
self.m_cluster = cluster # the cluster this machine belongs to
self.m_requestedPowerState = PowerState.ON
self.m_powerState = PowerState.UNKNOWN
self.m_lastPowerStateTime = None # time at which the last value of self.m_powerState has been set
self.m_machineStatusUpdater = ClusterNodeStatusUpdater(machineName, self, gridEngine)
self.m_energyConsumption = 0.0 # estimate of the energy consumption of this machine since the start of cluster controller (in joules)
self.m_energySavings = 0.0 # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules)
name: ClusterNodeId
cluster: 'ClusterStatus' # the cluster this machine belongs to
requested_power_state: PowerState
power_state: PowerState
last_power_state_time: Optional[datetime] # time at which the last value of self.power_state has been set
machine_status_updater: ClusterNodeStatusUpdater
energy_consumption: float # estimate of the energy consumption of this machine since the start of cluster controller (in joules)
energy_savings: float # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules)
def getName(self):
return self.m_name
def __init__(self, machine_name: ClusterNodeId, cluster: 'ClusterStatus', grid_engine: 'SunGridEngine'):
self.name = machine_name
self.cluster = cluster # the cluster this machine belongs to
self.requested_power_state = PowerState.ON
self.power_state = PowerState.UNKNOWN
self.last_power_state_time = None # time at which the last value of self.power_state has been set
self.machine_status_updater = ClusterNodeStatusUpdater(machine_name, self, grid_engine)
self.energy_consumption = 0.0 # estimate of the energy consumption of this machine since the start of cluster controller (in joules)
self.energy_savings = 0.0 # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules)
def isReady(self):
if self.m_powerState == PowerState.UNKNOWN:
# logInfo(self.m_name + ' is not ready (waiting for power state)')
def get_name(self) -> ClusterNodeId:
return self.name
def is_ready(self) -> bool:
if self.power_state == PowerState.UNKNOWN:
# log_info(self.name + ' is not ready (waiting for power state)')
return False
if self.m_powerState == PowerState.ON:
if self.power_state == PowerState.ON:
return True
# log(self.m_name + ' is ready')
# log(self.name + ' is ready')
return True
def getPowerState(self):
return self.m_powerState
def get_power_state(self) -> PowerState:
return self.power_state
def setShouldAlwaysBeOn(self):
self.m_machineStatusUpdater.setShouldAlwaysBeOn()
self.setPowerState(PowerState.ON)
def set_should_always_be_on(self):
self.machine_status_updater.set_should_always_be_on()
self.set_power_state(PowerState.ON)
def setPowerState(self, powerState):
def set_power_state(self, power_state: PowerState):
bUpdateRequiredChecks = False
if self.m_powerState == PowerState.UNKNOWN:
logInfo('ClusterNode::setPowerState : ' + self.m_name + '\'s power state has been initialized to ' + PowerStateToStr(powerState))
self.m_powerState = powerState
self.m_lastPowerStateTime = datetime.now()
if self.power_state == PowerState.UNKNOWN:
log_info('ClusterNode::set_power_state : ' + self.name + '\'s power state has been initialized to ' + PowerStateToStr(power_state))
self.power_state = power_state
self.last_power_state_time = datetime.now()
bUpdateRequiredChecks = True
else:
# update the estimation of energy consumption
self.updateEnergyMeasurements()
self.update_energy_measurements()
# then change the power state
if self.m_powerState != powerState:
logInfo('ClusterNode::setPowerState : ' + self.m_name + '\'s power state has been changed to ' + PowerStateToStr(powerState))
self.m_powerState = powerState
self.m_lastPowerStateTime = datetime.now()
if self.power_state != power_state:
log_info('ClusterNode::set_power_state : ' + self.name + '\'s power state has been changed to ' + PowerStateToStr(power_state))
self.power_state = power_state
self.last_power_state_time = datetime.now()
bUpdateRequiredChecks = True
if bUpdateRequiredChecks:
if self.m_powerState == PowerState.ON:
self.m_machineStatusUpdater.m_bCheckPowerState = True
self.m_machineStatusUpdater.m_bCheckSensors = True
elif self.m_powerState == PowerState.OFF:
self.m_machineStatusUpdater.m_bCheckPowerState = True
self.m_machineStatusUpdater.m_bCheckSensors = False
elif self.m_powerState == PowerState.SLEEP:
self.m_machineStatusUpdater.m_bCheckPowerState = True
self.m_machineStatusUpdater.m_bCheckSensors = False
elif self.m_powerState == PowerState.UNPLUGGED:
self.m_machineStatusUpdater.m_bCheckPowerState = True
self.m_machineStatusUpdater.m_bCheckSensors = False
if self.power_state == PowerState.ON:
self.machine_status_updater.check_power_state = True
self.machine_status_updater.check_sensors = True
elif self.power_state == PowerState.OFF:
self.machine_status_updater.check_power_state = True
self.machine_status_updater.check_sensors = False
elif self.power_state == PowerState.SLEEP:
self.machine_status_updater.check_power_state = True
self.machine_status_updater.check_sensors = False
elif self.power_state == PowerState.UNPLUGGED:
self.machine_status_updater.check_power_state = True
self.machine_status_updater.check_sensors = False
else:
assert False
def onNewPowerStateReading(self, powerState):
def on_new_power_state_reading(self, power_state: PowerState):
"""
called when a new powerstate reading arrives
"""
if powerState != self.getPowerState():
if self.getPowerState() != PowerState.UNKNOWN:
logWarning('ClusterNode::onNewPowerStateReading : ' + self.m_name + '\'s power state has been (manually it seems) changed to ' + PowerStateToStr(powerState))
self.setPowerState(powerState)
if power_state != self.get_power_state():
if self.get_power_state() != PowerState.UNKNOWN:
log_warning('ClusterNode::on_new_power_state_reading : ' + self.name + '\'s power state has been (manually it seems) changed to ' + PowerStateToStr(power_state))
self.set_power_state(power_state)
def getPowerConsumptionForPowerState(self, ePowerState):
def get_power_consumption_for_power_state(self, power_state: PowerState) -> float:
"""
returns the power consumption estimation (in watts) of this machine for the given power state
"""
fCurrentIntensity = 0.0
fCurrentVoltage = 220.0
# noticed on 26.08.2009 that putting 22 machines from sleep to on eats 17 A, resulting in difference of 0.77 A per machine
if ePowerState == PowerState.ON:
if power_state == PowerState.ON:
fCurrentIntensity = 0.9 # value when the machine is doing nothing
elif ePowerState == PowerState.OFF:
elif power_state == PowerState.OFF:
fCurrentIntensity = 0.1
elif ePowerState == PowerState.SLEEP:
elif power_state == PowerState.SLEEP:
fCurrentIntensity = 0.1
elif ePowerState == PowerState.UNPLUGGED:
elif power_state == PowerState.UNPLUGGED:
fCurrentIntensity = 0.0
else:
assert False
return fCurrentIntensity * fCurrentVoltage
def updateEnergyMeasurements(self):
timeInterval = datetime.now() - self.m_lastPowerStateTime
self.m_energyConsumption += self.getPowerConsumptionForPowerState(self.m_powerState) * timeInterval.seconds
self.m_energySavings += (self.getPowerConsumptionForPowerState(PowerState.ON) - self.getPowerConsumptionForPowerState(self.m_powerState)) * timeInterval.seconds
self.m_lastPowerStateTime = datetime.now()
# logDebug('energy savings on %s : %f J' %(self.getName(), self.m_energySavings))
def update_energy_measurements(self):
timeInterval = datetime.now() - self.last_power_state_time
self.energy_consumption += self.get_power_consumption_for_power_state(self.power_state) * timeInterval.seconds
self.energy_savings += (self.get_power_consumption_for_power_state(PowerState.ON) - self.get_power_consumption_for_power_state(self.power_state)) * timeInterval.seconds
self.last_power_state_time = datetime.now()
# logDebug('energy savings on %s : %f J' %(self.get_name(), self.energy_savings))
def getEnergyConsumption(self):
def get_energy_consumption(self) -> float:
"""
in joules
"""
self.updateEnergyMeasurements()
return self.m_energyConsumption
self.update_energy_measurements()
return self.energy_consumption
def getPowerConsumption(self):
fCurrentPowerConsumption = self.getPowerConsumptionForPowerState(self.m_powerState)
# logDebug('getPowerConsumption of %s : %f (powerstate = %d)' % (self.getName(), fCurrentPowerConsumption, self.m_powerState))
def get_power_consumption(self) -> float:
fCurrentPowerConsumption = self.get_power_consumption_for_power_state(self.power_state)
# logDebug('get_power_consumption of %s : %f (powerstate = %d)' % (self.get_name(), fCurrentPowerConsumption, self.power_state))
return fCurrentPowerConsumption
def getEnergySavings(self):
self.updateEnergyMeasurements()
return self.m_energySavings
def get_energy_savings(self) -> float:
self.update_energy_measurements()
return self.energy_savings
def onSleepFailedBecauseAJobJustArrived(self):
logInfo('%s was scheduled to sleep but the sleep is canceled because it\'s currently executing a new job' % self.m_name)
def on_sleep_because_a_job_just_arrived(self):
log_info('%s was scheduled to sleep but the sleep is canceled because it\'s currently executing a new job' % self.name)
def requestSleep(self, sleepCompleteNotifier=None):
self.m_machineStatusUpdater.requestSleep(sleepCompleteNotifier)
def request_sleep(self, sleep_complete_notifier: Optional['SleepCompleteNotifier'] = None):
self.machine_status_updater.request_sleep(sleep_complete_notifier)
def requestWakeUp(self, wakeUpCompleteNotifier=None):
self.m_machineStatusUpdater.requestWakeUp(wakeUpCompleteNotifier)
def request_wake_up(self, wake_up_complete_notifier: Optional['SleepCompleteNotifier'] = None):
self.machine_status_updater.request_wake_up(wake_up_complete_notifier)
def getQueueMachineName(self):
return self.getCluster().getJobsState().getQueueMachine(self.m_name).getName()
assert self.m_queueName is not None
return self.m_queueName
def get_queue_machine_name(self) -> ClusterNodeId:
return self.get_cluster().get_jobs_state().get_queue_machine(self.name).get_name()
# assert self.queue_name is not None
# return self.queue_name
def getCluster(self):
return self.m_cluster
def get_cluster(self) -> 'ClusterStatus':
return self.cluster
# from .ClusterStatus import ClusterStatus # noqa: E402, pylint: disable=wrong-import-position
# from .SunGridEngine import SunGridEngine # noqa: E402, pylint: disable=wrong-import-position
# from .ClusterController import SleepCompleteNotifier # noqa: E402, pylint: disable=wrong-import-position

View File

@ -1,39 +1,46 @@
from typing import TYPE_CHECKING
from typing import Optional, List
import threading
import time
import Lib.Util
import Lib.SimpaDbUtil
from PowerState import PowerState
from Log import logInfo, logDebug
from Util import blockingWakeUpMachine, blockingPutMachineToSleep, getPowerState, onException
import abc
from .PowerState import PowerState
from .Log import log_info, logDebug
from .Util import blocking_wake_up_machine, blocking_put_machine_to_sleep, get_power_state, on_exception
if TYPE_CHECKING:
from .ClusterNode import ClusterNodeId, ClusterNode
from .SunGridEngine import SunGridEngine
class IWakeUpCompleteNotifier:
class IWakeUpCompleteNotifier(abc.ABCMeta):
"""
interface for wakeup notifiers
"""
def onWakeUpComplete(self):
@abc.abstractmethod
def on_wake_up_complete(self):
assert False
class ISleepCompleteNotifier:
class ISleepCompleteNotifier(abc.ABCMeta):
"""
interface for sleep notifiers
"""
def onSleepComplete(self, bSleepSucceeded):
@abc.abstractmethod
def on_sleep_complete(self, bSleepSucceeded):
assert False
class IRequest:
class IRequest(abc.ABCMeta):
GO_TO_SLEEP = 1
WAKE_UP = 2
CHECK_POWER_STATE = 3
def __init__(self, requestType):
self.m_type = requestType
self.type = requestType
def getType(self):
return self.m_type
return self.type
@abc.abstractmethod
def process(self, clusterNodeStatusUpdater):
"""
processes this request
@ -43,58 +50,58 @@ class IRequest:
class WakeUpRequest(IRequest):
def __init__(self, wakeUpNotifier):
def __init__(self, wakeUpNotifier: IWakeUpCompleteNotifier):
IRequest.__init__(self, IRequest.WAKE_UP)
self.m_wakeUpNotifier = wakeUpNotifier
self.wake_up_notifier = wakeUpNotifier
def process(self, clusterNodeStatusUpdater):
assert clusterNodeStatusUpdater.m_bShouldAlwaysBeOn is False # are we attempting to wake up a machine that should always be on ?
logInfo('Handling wakeup request for %s' % clusterNodeStatusUpdater.getName())
bSuccess = blockingWakeUpMachine(clusterNodeStatusUpdater.getName())
assert clusterNodeStatusUpdater.should_always_be_on is False # are we attempting to wake up a machine that should always be on ?
log_info('Handling wakeup request for %s' % clusterNodeStatusUpdater.get_name())
bSuccess = blocking_wake_up_machine(clusterNodeStatusUpdater.get_name())
assert bSuccess
# activate the associated machine queue
if clusterNodeStatusUpdater.setQueueActivation(True):
if clusterNodeStatusUpdater.set_queue_activation(True):
pass # all is ok
else:
assert False
clusterNodeStatusUpdater.m_stateLock.acquire()
clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.ON)
clusterNodeStatusUpdater.m_stateLock.release()
if self.m_wakeUpNotifier:
clusterNodeStatusUpdater.state_lock.acquire()
clusterNodeStatusUpdater.cluster_node.set_power_state(PowerState.ON)
clusterNodeStatusUpdater.state_lock.release()
if self.wake_up_notifier:
logDebug('ClusterNodeStatusUpdater::run : Sending wakeup notification')
self.m_wakeUpNotifier.onWakeUpComplete()
self.wake_up_notifier.on_wake_up_complete()
class SleepRequest(IRequest):
def __init__(self, sleepCompleteNotifier):
def __init__(self, sleepCompleteNotifier: ISleepCompleteNotifier):
IRequest.__init__(self, IRequest.GO_TO_SLEEP)
self.m_sleepCompleteNotifier = sleepCompleteNotifier
self.sleep_complete_notifier = sleepCompleteNotifier
def process(self, clusterNodeStatusUpdater):
assert not clusterNodeStatusUpdater.m_bShouldAlwaysBeOn # are we attempting to put a machine the should stay on to sleep ?
logInfo('Handling sleep request for %s' % clusterNodeStatusUpdater.getName())
if clusterNodeStatusUpdater.setQueueActivation(False):
if clusterNodeStatusUpdater.queueIsEmpty():
if blockingPutMachineToSleep(clusterNodeStatusUpdater.m_clusterNodeName):
assert not clusterNodeStatusUpdater.should_always_be_on # are we attempting to put a machine the should stay on to sleep ?
log_info('Handling sleep request for %s' % clusterNodeStatusUpdater.get_name())
if clusterNodeStatusUpdater.set_queue_activation(False):
if clusterNodeStatusUpdater.queue_is_empty():
if blocking_put_machine_to_sleep(clusterNodeStatusUpdater.cluster_node_name):
# now we know that the machine is asleep
clusterNodeStatusUpdater.m_stateLock.acquire()
clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.SLEEP)
clusterNodeStatusUpdater.m_stateLock.release()
if self.m_sleepCompleteNotifier:
self.m_sleepCompleteNotifier.onSleepComplete(True)
clusterNodeStatusUpdater.state_lock.acquire()
clusterNodeStatusUpdater.cluster_node.set_power_state(PowerState.SLEEP)
clusterNodeStatusUpdater.state_lock.release()
if self.sleep_complete_notifier:
self.sleep_complete_notifier.on_sleep_complete(True)
else:
assert False
else:
# reactivate the queue
if not clusterNodeStatusUpdater.setQueueActivation(True):
if not clusterNodeStatusUpdater.set_queue_activation(True):
assert False
clusterNodeStatusUpdater.m_stateLock.acquire()
clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.ON) # this is necessary to reenable the various cyclic checks that were disabled on sleep request
clusterNodeStatusUpdater.m_stateLock.release()
clusterNodeStatusUpdater.m_clusterNode.onSleepFailedBecauseAJobJustArrived()
if self.m_sleepCompleteNotifier:
self.m_sleepCompleteNotifier.onSleepComplete(False)
clusterNodeStatusUpdater.state_lock.acquire()
clusterNodeStatusUpdater.cluster_node.set_power_state(PowerState.ON) # this is necessary to reenable the various cyclic checks that were disabled on sleep request
clusterNodeStatusUpdater.state_lock.release()
clusterNodeStatusUpdater.cluster_node.on_sleep_because_a_job_just_arrived()
if self.sleep_complete_notifier:
self.sleep_complete_notifier.on_sleep_complete(False)
else:
assert False
@ -105,88 +112,99 @@ class CheckPowerStateRequest(IRequest):
IRequest.__init__(self, IRequest.CHECK_POWER_STATE)
def process(self, clusterNodeStatusUpdater):
powerState = getPowerState(clusterNodeStatusUpdater.m_clusterNodeName)
clusterNodeStatusUpdater.m_stateLock.acquire()
clusterNodeStatusUpdater.m_clusterNode.onNewPowerStateReading(powerState)
clusterNodeStatusUpdater.m_lastPowerStateCheckTime = time.time()
clusterNodeStatusUpdater.m_stateLock.release()
powerState = get_power_state(clusterNodeStatusUpdater.cluster_node_name)
clusterNodeStatusUpdater.state_lock.acquire()
clusterNodeStatusUpdater.cluster_node.on_new_power_state_reading(powerState)
clusterNodeStatusUpdater.last_power_check_state_time = time.time()
clusterNodeStatusUpdater.state_lock.release()
class ClusterNodeStatusUpdater(threading.Thread):
cluster_node_name: 'ClusterNodeId'
cluster_node: 'ClusterNode'
grid_engine: 'SunGridEngine'
stop: bool
last_power_check_state_time: Optional[time.time]
check_power_state: bool
check_sensors: Optional[bool]
state_lock: threading.Lock # lock that prevents concurrent access to the state of this instance
should_always_be_on: bool # indicates that the machine should never go to sleep or off for whatever reason (eg simpatix10)
pending_requests_queue: List[IRequest]
DELAY_BETWEEN_POWERSTATE_CHECKS = 5 * 60 # in seconds
def __init__(self, machineName, clusterNode, gridEngine):
def __init__(self, machineName: 'ClusterNodeId', clusterNode: 'ClusterNode', gridEngine: 'SunGridEngine'):
threading.Thread.__init__(self)
self.m_clusterNodeName = machineName
self.m_clusterNode = clusterNode
self.m_gridEngine = gridEngine
self.m_bStop = False
self.m_lastPowerStateCheckTime = None # time.time()
self.m_bCheckPowerState = True
self.m_stateLock = threading.Lock() # lock that prevents concurrent access to the state of this instance
self.m_bShouldAlwaysBeOn = False # indicates that the machine should never go to sleep or off for whatever reason (eg simpatix10)
self.m_pendingRequestsQueue = []
self.cluster_node_name = machineName
self.cluster_node = clusterNode
self.grid_engine = gridEngine
self.stop = False
self.last_power_check_state_time = None
self.check_power_state = True
self.state_lock = threading.Lock()
self.should_always_be_on = False
self.pending_requests_queue = []
self.check_sensors = None
def getGridEngine(self):
return self.m_gridEngine
def get_grid_engine(self):
return self.grid_engine
def getName(self):
return self.m_clusterNodeName
def get_name(self):
return self.cluster_node_name
def setShouldAlwaysBeOn(self):
print('%s should always be on' % (self.getName()))
self.m_bShouldAlwaysBeOn = True
def set_should_always_be_on(self):
print('%s should always be on' % (self.get_name()))
self.should_always_be_on = True
def pushRequest(self, request):
self.m_stateLock.acquire()
self.m_pendingRequestsQueue.append(request)
self.m_stateLock.release()
def push_request(self, request: IRequest):
self.state_lock.acquire()
self.pending_requests_queue.append(request)
self.state_lock.release()
def popRequest(self):
oldestRequest = None
self.m_stateLock.acquire()
if len(self.m_pendingRequestsQueue) != 0:
oldestRequest = self.m_pendingRequestsQueue.pop(0)
self.m_stateLock.release()
return oldestRequest
def pop_request(self) -> IRequest:
oldest_request = None
self.state_lock.acquire()
if len(self.pending_requests_queue) != 0:
oldest_request = self.pending_requests_queue.pop(0)
self.state_lock.release()
return oldest_request
def run(self):
try:
while not self.m_bStop:
while not self.stop:
# handle the oldest request
request = self.popRequest()
request = self.pop_request()
if request is not None:
request.process(self)
# schedule a power state check if required
currentTime = time.time()
if self.m_bCheckPowerState:
if not self.m_bShouldAlwaysBeOn: # don't do power checks on such machines because some current implementations of
if self.check_power_state:
if not self.should_always_be_on: # don't do power checks on such machines because some current implementations of
# operations involved might cause the machine to go to sleep
if (not self.m_lastPowerStateCheckTime) or (currentTime > (self.m_lastPowerStateCheckTime + ClusterNodeStatusUpdater.DELAY_BETWEEN_POWERSTATE_CHECKS)):
self.pushRequest(CheckPowerStateRequest())
if (not self.last_power_check_state_time) or (currentTime > (self.last_power_check_state_time + ClusterNodeStatusUpdater.DELAY_BETWEEN_POWERSTATE_CHECKS)):
self.push_request(CheckPowerStateRequest()) # pylint: disable=no-value-for-parameter
time.sleep(1)
except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
onException(exception)
on_exception(exception)
def requestSleep(self, sleepCompleteNotifier=None):
assert not self.m_bShouldAlwaysBeOn
self.pushRequest(SleepRequest(sleepCompleteNotifier))
def request_sleep(self, sleep_complete_notifier: Optional[ISleepCompleteNotifier] = None):
assert not self.should_always_be_on
self.push_request(SleepRequest(sleep_complete_notifier)) # pylint: disable=no-value-for-parameter
def requestWakeUp(self, wakeUpNotifier=None):
assert self.m_bShouldAlwaysBeOn is False
self.pushRequest(WakeUpRequest(wakeUpNotifier))
def request_wake_up(self, wake_up_complete_notifier: Optional[IWakeUpCompleteNotifier] = None):
assert self.should_always_be_on is False
self.push_request(WakeUpRequest(wake_up_complete_notifier)) # pylint: disable=no-value-for-parameter
def getQueueMachineName(self):
return self.m_clusterNode.getQueueMachineName()
def get_queue_machine_name(self):
return self.cluster_node.get_queue_machine_name()
def setQueueActivation(self, bEnable):
def set_queue_activation(self, bEnable: bool):
"""
@return true on success, false otherwise
"""
return self.getGridEngine().setQueueInstanceActivation(self.getQueueMachineName(), bEnable)
return self.get_grid_engine().set_queue_instance_activation(self.get_queue_machine_name(), bEnable)
def queueIsEmpty(self):
return self.getGridEngine().queueIsEmpty(self.getName())
def queue_is_empty(self):
return self.get_grid_engine().queue_is_empty(self.get_name())

View File

@ -1,11 +1,14 @@
import threading
from JobsStateUpdater import JobsStateUpdater
import Lib.Util
import Lib.SimpaDbUtil
from ClusterNode import ClusterNode
from Log import logInfo, logError
from PowerState import PowerState
import time
from typing import Dict, Optional, List
import threading
from .Job import TaskUid, Task, QueueMachineId, JobRequirements
from .JobsStateUpdater import JobsStateUpdater
from .ClusterNode import ClusterNode, ClusterNodeId
from .Log import log_info, logError
from .PowerState import PowerState
from ..SimpaDbUtil import get_cluster_machines_names
from .SunGridEngine import SunGridEngine
from .JobsState import JobsState
class ClusterStatus:
@ -14,196 +17,203 @@ class ClusterStatus:
@param gridEngine the interface to the batch job tool (in our case it's sun grid engine)
"""
def __init__(self, gridEngine):
self.m_gridEngine = gridEngine
self.m_clusterNodes = {}
self.m_lock = threading.Lock() # to prevent concurrent access to this instance
self.m_jobsStateUpdater = JobsStateUpdater(self)
self.m_jobsState = None
# self.m_controlledMachineNames = ['simpatix30']
self.m_controlledMachineNames = [] # ['simpatix30']
grid_engine: SunGridEngine
cluster_nodes: Dict[ClusterNodeId, ClusterNode]
lock: threading.Lock # to prevent concurrent access to this instance
jobs_state_updater: JobsStateUpdater
jobs_state: Optional[JobsState]
controlled_machine_names: List[ClusterNodeId]
def __init__(self, grid_engine: SunGridEngine):
self.grid_engine = grid_engine
self.cluster_nodes = {}
self.lock = threading.Lock()
self.jobs_state_updater = JobsStateUpdater(self)
self.jobs_state = None
# self.controlled_machine_names = ['simpatix30']
self.controlled_machine_names = [] # ['simpatix30']
if False:
for iMachine in range(11, 40):
if (iMachine == 31) or (iMachine == 32):
continue # these machines don't seem to be able to go to sleep properly (bug 00000010)
if (iMachine == 18):
continue # this machine needs maintenance (restarting because it's very slow for an unknown reason)
self.m_controlledMachineNames.append('simpatix%d' % iMachine)
nodeNames = Lib.SimpaDbUtil.getClusterMachinesNames()
for nodeName in nodeNames:
if nodeName in self.m_controlledMachineNames:
logInfo('machine %s is under the cluster controller\'s control' % nodeName)
clusterNode = ClusterNode(nodeName, self, gridEngine)
if nodeName == 'simpatix10':
clusterNode.setShouldAlwaysBeOn()
self.m_clusterNodes[nodeName] = clusterNode
self.controlled_machine_names.append('simpatix%d' % iMachine)
node_names = get_cluster_machines_names()
for node_name in node_names:
if node_name in self.controlled_machine_names:
log_info('machine %s is under the cluster controller\'s control' % node_name)
cluster_node = ClusterNode(node_name, self, grid_engine)
if node_name == 'simpatix10':
cluster_node.set_should_always_be_on()
self.cluster_nodes[node_name] = cluster_node
return
def setControlOnMachine(self, machineName, bControl):
if bControl:
def set_control_on_machine(self, machine_name: ClusterNodeId, control: bool):
if control:
# add machineName under control of ClusterController
for k, v in self.m_clusterNodes.items():
if v.getName() == machineName:
for _k, v in self.cluster_nodes.items():
if v.get_name() == machine_name:
return # nothing to do : machineName is already under the control of ClusterController
clusterNode = ClusterNode(machineName, self, self.m_gridEngine)
if machineName == 'simpatix10':
clusterNode.setShouldAlwaysBeOn()
self.m_clusterNodes[machineName] = clusterNode
clusterNode.m_machineStatusUpdater.start()
cluster_node = ClusterNode(machine_name, self, self.grid_engine)
if machine_name == 'simpatix10':
cluster_node.set_should_always_be_on()
self.cluster_nodes[machine_name] = cluster_node
cluster_node.machine_status_updater.start()
else:
# remove machineName from control of ClusterController
clusterNode = self.m_clusterNodes.get(machineName)
if clusterNode:
clusterNode.m_machineStatusUpdater.m_bStop = True
clusterNode.m_machineStatusUpdater.join()
self.m_clusterNodes.pop(machineName)
cluster_node = self.cluster_nodes.get(machine_name)
if cluster_node:
cluster_node.machine_status_updater.stop = True
cluster_node.machine_status_updater.join()
self.cluster_nodes.pop(machine_name)
def getGridEngine(self):
return self.m_gridEngine
def get_grid_engine(self) -> SunGridEngine:
return self.grid_engine
def getMachines(self):
return self.m_clusterNodes
def get_machines(self) -> Dict[ClusterNodeId, ClusterNode]:
return self.cluster_nodes
def startReadingThreads(self):
for k, v in self.m_clusterNodes.items():
v.m_machineStatusUpdater.start()
self.m_jobsStateUpdater.start()
def start_reading_threads(self):
for _k, v in self.cluster_nodes.items():
v.machine_status_updater.start()
self.jobs_state_updater.start()
def stopReadingThreads(self):
for k, v in self.m_clusterNodes.items():
v.m_machineStatusUpdater.m_bStop = True
v.m_machineStatusUpdater.join()
self.m_jobsStateUpdater.m_bStop = True
self.m_jobsStateUpdater.join()
def stop_reading_threads(self):
for _k, v in self.cluster_nodes.items():
v.machine_status_updater.stop = True
v.machine_status_updater.join()
self.jobs_state_updater.stop = True
self.jobs_state_updater.join()
def onNewJobsState(self, newJobsState):
# logDebug('ClusterStatus::onNewJobsState : attempting to acquire lock to access m_jobsState')
self.m_lock.acquire()
# logDebug('ClusterStatus::onNewJobsState : got lock to access m_jobsState')
self.m_jobsState = newJobsState
self.m_lock.release()
def on_new_jobs_state(self, new_jobs_state: JobsState):
# logDebug('ClusterStatus::on_new_jobs_state : attempting to acquire lock to access jobs_state')
self.lock.acquire()
# logDebug('ClusterStatus::on_new_jobs_state : got lock to access jobs_state')
self.jobs_state = new_jobs_state
self.lock.release()
def getJobsOnMachine(self, machineName):
return self.m_jobsState.getJobsOnMachine(machineName)
def get_jobs_on_machine(self, machine_name: ClusterNodeId) -> Dict[TaskUid, Task]:
return self.jobs_state.get_jobs_on_machine(machine_name)
def isReady(self):
for k, v in self.m_clusterNodes.items():
if not v.isReady():
logInfo('ClusterStatus::isReady : not ready because of ' + v.getName())
def is_ready(self) -> bool:
for _k, v in self.cluster_nodes.items():
if not v.is_ready():
log_info('ClusterStatus::is_ready : not ready because of ' + v.get_name())
return False
# log('ClusterStatus::isReady() : '+k+' is ready')
# log('ClusterStatus::is_ready() : '+k+' is ready')
# assert(False)
if self.m_jobsState is None:
logInfo('ClusterStatus::isReady : not ready because waiting for jobs state')
if self.jobs_state is None:
log_info('ClusterStatus::is_ready : not ready because waiting for jobs state')
return False
return True
def getIdleMachines(self):
assert self.isReady
def get_idle_machines(self) -> Dict[ClusterNodeId, ClusterNode]:
assert self.is_ready
bBUG_00000009_IS_STILL_ALIVE = True
if bBUG_00000009_IS_STILL_ALIVE:
currentTime = time.time()
fJOBS_STATE_MAX_ALLOWED_AGE = 3600
fJobsStateAge = currentTime - self.m_jobsState.getTime()
fJobsStateAge = currentTime - self.jobs_state.get_time()
if fJobsStateAge > fJOBS_STATE_MAX_ALLOWED_AGE:
logError('ClusterStatus::getIdleMachines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge))
logError('ClusterStatus::get_idle_machines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge))
assert False
idleMachines = {}
for machineName, machine in self.m_clusterNodes.items():
if machine.getPowerState() == PowerState.ON:
jobsOnThisMachine = self.getJobsOnMachine(machineName)
for machineName, machine in self.cluster_nodes.items():
if machine.get_power_state() == PowerState.ON:
jobsOnThisMachine = self.get_jobs_on_machine(machineName)
if len(jobsOnThisMachine) == 0:
idleMachines[machineName] = machine
return idleMachines
def getPendingJobs(self):
return self.m_jobsState.getPendingJobs()
def get_pending_jobs(self) -> Dict[TaskUid, Task]:
return self.jobs_state.get_pending_jobs()
def getJobsState(self):
return self.m_jobsState
def get_jobs_state(self) -> JobsState:
return self.jobs_state
def queueMachineFitsJobRequirements(self, queueMachine, jobRequirements):
if jobRequirements.m_queues:
def queue_machine_fits_job_requirements(self, queue_machine: QueueMachineId, job_requirements: JobRequirements) -> bool:
if job_requirements.queues:
bQueueIsInAllowedQueues = False
for queueName in jobRequirements.m_queues:
if queueName == queueMachine.getQueueName():
for queueName in job_requirements.queues:
if queueName == queue_machine.get_queue_name():
bQueueIsInAllowedQueues = True
if not bQueueIsInAllowedQueues:
logInfo('queueMachineFitsJobRequirements : queueMachine ' + queueMachine.getName() + ' rejected because it\'s not in the allowed queues')
log_info('queue_machine_fits_job_requirements : queue_machine ' + queue_machine.get_name() + ' rejected because it\'s not in the allowed queues')
return False
return True
def getEnergyConsumption(self):
def get_energy_consumption(self) -> float:
"""
returns an estimate of the energy consumption since the start of the cluster controller (in joules)
"""
fEnergyConsumption = 0.0
for machine in self.m_clusterNodes.values():
if machine.isReady(): # there are cases where the machine is not ready yet (for example, it's just been added to clustercontroller's control)
fEnergyConsumption += machine.getEnergyConsumption()
for machine in self.cluster_nodes.values():
if machine.is_ready(): # there are cases where the machine is not ready yet (for example, it's just been added to clustercontroller's control)
fEnergyConsumption += machine.get_energy_consumption()
return fEnergyConsumption
def getEnergySavings(self):
def get_energy_savings(self) -> float:
"""
returns an estimate of the energy saving since the start of the cluster controller (in joules)
"""
fEnergySavings = 0.0
for machine in self.m_clusterNodes.values():
if machine.isReady():
fEnergySavings += machine.getEnergySavings()
for machine in self.cluster_nodes.values():
if machine.is_ready():
fEnergySavings += machine.get_energy_savings()
return fEnergySavings
def getCurrentPowerConsumption(self):
fPowerConsumption = 0.0
for machine in self.m_clusterNodes.values():
if machine.isReady():
fPowerConsumption += machine.getPowerConsumption()
return fPowerConsumption
def get_current_power_consumption(self) -> float:
power_consumption = 0.0
for machine in self.cluster_nodes.values():
if machine.is_ready():
power_consumption += machine.get_power_consumption()
return power_consumption
def getCurrentPowerSavings(self):
fPowerSavings = 0.0
for machine in self.m_clusterNodes.values():
if machine.isReady():
fPowerSavings += machine.getPowerConsumptionForPowerState(PowerState.ON) - machine.getPowerConsumption()
return fPowerSavings
def get_current_power_savings(self) -> float:
power_savings = 0.0
for machine in self.cluster_nodes.values():
if machine.is_ready():
power_savings += machine.get_power_consumption_for_power_state(PowerState.ON) - machine.get_power_consumption()
return power_savings
def getNumControlledSlots(self):
self.m_lock.acquire()
iNumControlledSlots = 0
for machine in self.m_clusterNodes.values():
queueMachine = self.m_jobsState.getQueueMachine(machine.getName())
iNumControlledSlots += queueMachine.getNumSlots()
self.m_lock.release()
return iNumControlledSlots
def get_num_controlled_slots(self) -> int:
self.lock.acquire()
num_controlled_slots = 0
for machine in self.cluster_nodes.values():
queue_machine = self.jobs_state.get_queue_machine(machine.get_name())
num_controlled_slots += queue_machine.get_num_slots()
self.lock.release()
return num_controlled_slots
def getNumUsedSlots(self):
self.m_lock.acquire()
iNumUsedSlots = 0
for machine in self.m_clusterNodes.values():
queueMachine = self.m_jobsState.getQueueMachine(machine.getName())
iNumUsedSlotsOnThisMachine = queueMachine.getNumSlots() - self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
assert iNumUsedSlotsOnThisMachine >= 0
iNumUsedSlots += iNumUsedSlotsOnThisMachine
self.m_lock.release()
return iNumUsedSlots
def get_num_used_slots(self) -> int:
self.lock.acquire()
num_used_slots = 0
for machine in self.cluster_nodes.values():
queue_machine = self.jobs_state.get_queue_machine(machine.get_name())
num_used_slots_on_this_machine = queue_machine.get_num_slots() - self.jobs_state.getNumFreeSlotsOnQueueMachine(queue_machine)
assert num_used_slots_on_this_machine >= 0
num_used_slots += num_used_slots_on_this_machine
self.lock.release()
return num_used_slots
def getNumWastedSlots(self):
self.m_lock.acquire()
def get_num_wasted_slots(self) -> int:
self.lock.acquire()
iNumWastedSlots = 0
for machine in self.m_clusterNodes.values():
if machine.getPowerState() == PowerState.ON:
queueMachine = self.m_jobsState.getQueueMachine(machine.getName())
iNumWastedSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
self.m_lock.release()
for machine in self.cluster_nodes.values():
if machine.get_power_state() == PowerState.ON:
queue_machine = self.jobs_state.get_queue_machine(machine.get_name())
iNumWastedSlots += self.jobs_state.getNumFreeSlotsOnQueueMachine(queue_machine)
self.lock.release()
return iNumWastedSlots
def getNumSleepingSlots(self):
self.m_lock.acquire()
def get_num_sleeping_slots(self) -> int:
self.lock.acquire()
iNumSleepingSlots = 0
for machine in self.m_clusterNodes.values():
if machine.getPowerState() == PowerState.SLEEP:
queueMachine = self.m_jobsState.getQueueMachine(machine.getName())
iNumSleepingSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
self.m_lock.release()
for machine in self.cluster_nodes.values():
if machine.get_power_state() == PowerState.SLEEP:
queue_machine = self.jobs_state.get_queue_machine(machine.get_name())
iNumSleepingSlots += self.jobs_state.getNumFreeSlotsOnQueueMachine(queue_machine)
self.lock.release()
return iNumSleepingSlots

View File

@ -26,7 +26,7 @@ if __name__ == '__main__':
remoteCommand += 'launchctl unload /Library/LaunchDaemons/fr.univ-rennes1.ipr.ClusterController.plist;'
remoteCommand += 'launchctl load /Library/LaunchDaemons/fr.univ-rennes1.ipr.ClusterController.plist;'
command = 'ssh root@'+ machineName +' "'+remoteCommand+'"'
( returnCode, stdout, stderr ) = executeCommand( command )
( returnCode, stdout, stderr ) = execute_command( command )
for strSingleCommand in remoteCommand.split(';'):
print(strSingleCommand)
print(stdout)

View File

@ -1,3 +1,4 @@
import enum
from typing import Optional, Dict, List
from datetime import datetime
@ -13,7 +14,7 @@ class JobStateFlags:
SUSPENDED = 128
class ParallelEnvironment:
class ParallelEnvironment(enum.Enum):
MPI = 1
@ -29,14 +30,14 @@ ResourceRequest = str # eg 'mem_available=5G'
class JobRequirements:
num_slots: Optional[int]
architecture: Optional[str] # machine architecture
m_parallelEnvironment: Optional[int] # todo: make ParallelEnvironment an Enum
parallel_environment: Optional[ParallelEnvironment]
queues: Optional[List[QueueId]] # the list of queues this job is allowed to run on
resources: Optional[List[ResourceRequest]]
def __init__(self):
self.num_slots = None
self.architecture = None
self.m_parallelEnvironment = None
self.parallel_environment = None
self.queues = None
self.resources = None
@ -62,10 +63,10 @@ class TaskUid:
"""
required to use a TaskUid as a dict hash key
"""
hash = self.job_id * self.MAX_NUM_JOBS_IN_ARRAY
_hash = self.job_id * self.MAX_NUM_JOBS_IN_ARRAY
if self.task_id is not None:
hash += self.task_id
return hash
_hash += self.task_id
return _hash
def __eq__(self, other: 'TaskUid'):
"""

View File

@ -1,26 +1,30 @@
from typing import Dict
from .Log import *
from .Job import Task, TaskUid
from datetime import datetime
# from .Log import log_info
from .Job import Task, TaskUid, QueueMachineId
from .QueueMachine import QueueMachine
class JobsState:
"""
represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \*"
represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \\*"
"""
tasks: Dict[TaskUid, Task]
job_array_tasks: Dict[int, Dict[TaskUid, Task]]
tasks: Dict[TaskUid, Task] # list of tasks
job_array_tasks: Dict[int, Dict[TaskUid, Task]] # a dictionary of jobs for each job array, indexed by job array id
queue_machines: Dict[QueueMachineId, QueueMachine] # list of queue machines such as allintel.q@simpatix10
state_time: datetime # the time at which the state was snapshot
def __init__(self):
self.tasks = {} # list of jobs
self.job_array_tasks = {} # a dictionary of jobs for each job array, indexed by job array id
self.m_queueMachines = {} # list of queue machines such as allintel.q@simpatix10
self.m_stateTime = None # the time at which the state was snapshot
self.tasks = {}
self.job_array_tasks = {}
self.queue_machines = {}
self.state_time = None
def deleteAllJobs(self):
def delete_all_tasks(self):
self.tasks = {}
self.job_array_tasks = {}
def addTask(self, task: Task):
def add_task(self, task: Task):
task_uid = task.get_id()
self.tasks[task_uid] = task
if task_uid.is_job_array_element():
@ -36,56 +40,56 @@ class JobsState:
def get_job_array_tasks(self, job_array_id: int) -> Dict[TaskUid, Task]:
return self.job_array_tasks.get(job_array_id)
def setTime(self, stateTime):
self.m_stateTime = stateTime
def set_time(self, state_time: datetime):
self.state_time = state_time
def getTime(self):
return self.m_stateTime
def get_time(self) -> datetime:
return self.state_time
def getJobsOnMachine(self, machineName):
def get_jobs_on_machine(self, machine_name: str) -> Dict[TaskUid, Task]:
jobs_on_machine = {}
for task_uid, task in self.tasks.items():
for queueMachineName, numSlots in task.get_slots().items():
jobMachineName = queueMachineName.split('@')[1]
if jobMachineName == machineName:
for queue_machine_name, _num_slots in task.get_slots().items():
jobMachineName = queue_machine_name.split('@')[1]
if jobMachineName == machine_name:
jobs_on_machine[task_uid] = task
return jobs_on_machine
def getNumFreeSlotsOnQueueMachine(self, queueMachine):
# logInfo('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.getName())
def get_num_free_slots_on_queue_machine(self, queue_machine: QueueMachine) -> int:
# log_info('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.get_name())
numUsedSlots = 0
for job in self.tasks.values():
numUsedSlotsByThisJob = job.get_slots().get(queueMachine.getName())
numUsedSlotsByThisJob = job.get_slots().get(queue_machine.get_name())
if numUsedSlotsByThisJob is not None:
# logInfo('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob))
# log_info('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob))
numUsedSlots += numUsedSlotsByThisJob
else:
None
# logInfo('getNumFreeSlotsOnQueueMachine : job %d uses no slot' % job.getId().asStr())
numFreeSlots = queueMachine.getNumSlots() - numUsedSlots
pass
# log_info('getNumFreeSlotsOnQueueMachine : job %d uses no slot' % job.getId().asStr())
numFreeSlots = queue_machine.get_num_slots() - numUsedSlots
assert numFreeSlots >= 0
return numFreeSlots
def addQueueMachine(self, queueMachine):
self.m_queueMachines[queueMachine.getName()] = queueMachine
def add_queue_machine(self, queue_machine: QueueMachine):
self.queue_machines[queue_machine.get_name()] = queue_machine
def getQueueMachine(self, machineName):
def get_queue_machine(self, machine_name) -> QueueMachine:
"""
finds the queue machine associated with a machine
"""
queueMachine = None
for qmName, qm in self.m_queueMachines.items():
if qm.m_machineName == machineName:
for _qname, qm in self.queue_machines.items():
if qm.machine_name == machine_name:
assert queueMachine is None # to be sure that no more than one queue machine is on a given machine
queueMachine = qm
return queueMachine
def getQueueMachines(self):
return self.m_queueMachines
def get_queue_machines(self) -> Dict[QueueMachineId, QueueMachine]:
return self.queue_machines
def getPendingJobs(self):
pendingJobs = {}
for jobId, job in self.tasks.items():
if job.is_pending():
pendingJobs[job.get_id()] = job
return pendingJobs
def get_pending_jobs(self) -> Dict[TaskUid, Task]:
pending_jobs = {}
for _task_id, task in self.tasks.items():
if task.is_pending():
pending_jobs[task.get_id()] = task
return pending_jobs

View File

@ -1,35 +1,39 @@
from typing import TYPE_CHECKING
import threading
import Util
import os
import traceback
import sys
import time
from .Util import on_exception
if TYPE_CHECKING:
from .ClusterStatus import ClusterStatus
class JobsStateUpdater(threading.Thread):
cluster_status: 'ClusterStatus'
stop: bool
DELAY_BETWEEN_STATUS_CHECKS = 10 # in seconds
def __init__(self, clusterStatus):
threading.Thread.__init__(self)
self.m_clusterStatus = clusterStatus
self.m_bStop = False
self.cluster_status = clusterStatus
self.stop = False
def getName( self ):
def get_name(self):
return 'JobsStateUpdater'
def getGridEngine( self ):
return self.m_clusterStatus.getGridEngine()
def get_grid_engine(self):
return self.cluster_status.get_grid_engine()
def updateClusterStatus(self):
# log('JobsStateUpdater::updateClusterStatus : start')
jobsState = self.getGridEngine().getCurrentJobsState()
jobsState = self.get_grid_engine().getCurrentJobsState()
# update the jobs in the cluster status
self.m_clusterStatus.onNewJobsState( jobsState )
self.cluster_status.on_new_jobs_state(jobsState)
# log('JobsStateUpdater::updateClusterStatus : end')
def run(self):
try:
while not self.m_bStop :
while not self.stop:
self.updateClusterStatus()
time.sleep(JobsStateUpdater.DELAY_BETWEEN_STATUS_CHECKS)
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
Util.onException(exception)
except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
on_exception(exception)

View File

@ -5,10 +5,10 @@ gLogFilePath = '/tmp/ClusterController.log' # '/var/log/ClusterController.log'
def log(message):
threadName = threading.currentThread().getName()
threadName = threading.currentThread().get_name()
logMessage = time.asctime(time.localtime()) + ' : ' + threadName + ' : ' + message
print(logMessage)
f = open(gLogFilePath, 'a+')
f = open(gLogFilePath, 'a+', encoding='utf8')
assert f
try:
f.write(logMessage + '\n')
@ -21,11 +21,11 @@ def logDebug(message):
return
def logInfo(message):
def log_info(message):
log('[I]' + message)
def logWarning(message):
def log_warning(message):
log('[W]' + message)

View File

@ -1,5 +1,8 @@
class PowerState:
import enum
class PowerState(enum.Enum):
UNKNOWN = 0
OFF = 1
ON = 2

View File

@ -2,7 +2,6 @@ import io
import re
from .JobsState import JobsState
from .QueueMachine import QueueMachine, QueueMachineStateFlags
from .Util import *
from .Log import logError
from .Job import JobStateFlags, TaskUid, Task, ParallelEnvironment, JobState
import logging
@ -54,13 +53,13 @@ class QstatParser:
assert False, 'unhandled queue machine state flag :"' + c + '"'
return queueMachineState
def parseQstatOutput(self, qstatOutput, cluster_domain: str = 'ipr.univ-rennes1.fr'):
def parse_qstat_output(self, qstat_output: str, cluster_domain: str = 'ipr.univ-rennes1.fr'):
"""
parses result of command 'qstat -f -u \\* -pri'
cluster_domain: network domain of the cluster (eg 'ipr.univ-rennes.fr'). This information is missing from qstat's output and is used to form the fully qualified domain name of the cluster machines.
"""
logging.debug('qstatOutput type : %s' % type(qstatOutput))
logging.debug('qstatOutput type : %s', type(qstat_output))
def parse_pending_tasks(task_ranges_sequence):
"""
@ -99,13 +98,13 @@ class QstatParser:
# ---------------------------------------------------------------------------------
# main.q@physix88.ipr.univ-renne BIP 0/0/36 14.03 lx-amd64
# TODO: fix this properly by parsing the output of 'qstat -f -u \* -xml' instead of 'qstat -f -u \*'
qstatOutput = re.sub(r'\.ipr\.univ[^ ]*', f'.{cluster_domain}', qstatOutput)
qstat_output = re.sub(r'\.ipr\.univ[^ ]*', f'.{cluster_domain}', qstat_output)
jobsState = JobsState()
f = io.StringIO(qstatOutput)
f = io.StringIO(qstat_output)
line = f.readline()
currentQueueMachine = None
bInPendingJobsSection = False
current_queue_machine = None
in_pending_jobs_section = False
# examples of job line :
# 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1
# a typical job line in the pending jobs section looks like this :
@ -120,42 +119,42 @@ class QstatParser:
# ntckts The job's ticket amount in normalized fashion.
# ppri The job's -p priority as specified by the user.
jobRegularExp = re.compile(r'^[ ]*(?P<jobId>[^ ]+)[ ]+(?P<JobPriority>[0-9.]+)[ ]+(?P<nurg>[0-9.]+)[ ]+(?P<npprior>[0-9.]+)[ ]+(?P<ntckts>[0-9.]+)[ ]+(?P<ppri>-?[0-9]+)[ ]+(?P<jobScriptName>[^ ]+)[ ]+(?P<jobOwner>[^ ]+)[ ]+(?P<jobStatus>[^ ]+)[ ]+(?P<jobStartOrSubmitTime>[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P<numSlots>[0-9]+)[ ]+(?P<jobArrayDetails>[^\n]*)[\s]*$')
job_regular_exp = re.compile(r'^[ ]*(?P<jobId>[^ ]+)[ ]+(?P<JobPriority>[0-9.]+)[ ]+(?P<nurg>[0-9.]+)[ ]+(?P<npprior>[0-9.]+)[ ]+(?P<ntckts>[0-9.]+)[ ]+(?P<ppri>-?[0-9]+)[ ]+(?P<jobScriptName>[^ ]+)[ ]+(?P<jobOwner>[^ ]+)[ ]+(?P<jobStatus>[^ ]+)[ ]+(?P<jobStartOrSubmitTime>[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P<numSlots>[0-9]+)[ ]+(?P<jobArrayDetails>[^\n]*)[\s]*$')
# example of machine line :
# allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86
machineRegularExp = re.compile(r'^(?P<queueName>[^@]+)@(?P<machineName>[^ ]+)[ ]+(?P<queueTypeString>[^ ]+)[ ]+(?P<numReservedSlots>[^/]+)/(?P<numUsedSlots>[^/]+)/(?P<numTotalSlots>[^ ]+)[ ]+(?P<cpuLoad>[^ ]+)[\s]+(?P<archName>[^ ]+)[\s]+(?P<queueMachineStatus>[^\s]*)')
pendingJobsHeaderRegularExp = re.compile('^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*')
machine_regular_exp = re.compile(r'^(?P<queueName>[^@]+)@(?P<machineName>[^ ]+)[ ]+(?P<queueTypeString>[^ ]+)[ ]+(?P<numReservedSlots>[^/]+)/(?P<numUsedSlots>[^/]+)/(?P<numTotalSlots>[^ ]+)[ ]+(?P<cpuLoad>[^ ]+)[\s]+(?P<archName>[^ ]+)[\s]+(?P<queueMachineStatus>[^\s]*)')
pending_jobs_header_regular_exp = re.compile('^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*')
while len(line) > 0:
# print line
# check if the current line is a line describing a job running on a machine
matchObj = jobRegularExp.match(line)
if matchObj:
match_obj = job_regular_exp.match(line)
if match_obj:
# we are dealing with a job line
if not bInPendingJobsSection:
assert currentQueueMachine
if not in_pending_jobs_section:
assert current_queue_machine
# log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"')
job_id = int(matchObj.group('jobId'))
logging.debug('iJobId = %d' % job_id)
jobState = self.parseJobState(matchObj.group('jobStatus'))
strJobArrayDetails = matchObj.group('jobArrayDetails')
bIsJobArray = (len(strJobArrayDetails) != 0)
job_id = int(match_obj.group('jobId'))
logging.debug('iJobId = %d', job_id)
job_state = self.parseJobState(match_obj.group('jobStatus'))
job_array_details = match_obj.group('jobArrayDetails')
is_job_array = (len(job_array_details) != 0)
# logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray)))
# each element of a job array is treated as a separate job for the sake of simplicity.
# For these elements, the job id in sge sense is the same, but they are different in this program's sense
task_ids = range(0, 1) # just one element, unless it's a job array
if bIsJobArray:
if bInPendingJobsSection:
task_ids = parse_pending_tasks(strJobArrayDetails)
if is_job_array:
if in_pending_jobs_section:
task_ids = parse_pending_tasks(job_array_details)
else:
# we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element
iJobArrayElementIndex = int(strJobArrayDetails)
assert iJobArrayElementIndex != 0 # sge does not allow element indices to be 0
task_ids = range(iJobArrayElementIndex, iJobArrayElementIndex + 1)
logging.debug('task_ids = %s' % task_ids)
task_id = int(job_array_details)
assert task_id != 0 # sge does not allow element indices to be 0
task_ids = range(task_id, task_id + 1)
logging.debug('task_ids = %s', task_ids)
for task_id in task_ids:
logging.debug('task_id = %s' % task_id)
logging.debug('task_id = %s', task_id)
task_uid = None
if bIsJobArray:
if is_job_array:
task_uid = TaskUid(job_id, task_id)
else:
task_uid = TaskUid(job_id)
@ -165,57 +164,57 @@ class QstatParser:
# this job hasn't been encountered yet in the output of qstat ...
# we could either be in the pending jobs section or in the running jobs section
task = Task(task_uid)
jobsState.addTask(task)
task.set_state(jobState)
strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime')
jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S')
if bInPendingJobsSection:
task.get_submit_time(jobStartOrSubmitTime)
jobsState.add_task(task)
task.set_state(job_state)
job_start_or_submit_time_as_str = match_obj.group('jobStartOrSubmitTime')
job_start_or_submit_time = time.strptime(job_start_or_submit_time_as_str, '%m/%d/%Y %H:%M:%S')
if in_pending_jobs_section:
task.get_submit_time(job_start_or_submit_time)
else:
task.set_start_time(jobStartOrSubmitTime)
task.set_owner(matchObj.group('jobOwner'))
task.set_script_name(matchObj.group('jobScriptName'))
if bInPendingJobsSection:
task.set_num_required_slots(int(matchObj.group('numSlots')))
task.set_start_time(job_start_or_submit_time)
task.set_owner(match_obj.group('jobOwner'))
task.set_script_name(match_obj.group('jobScriptName'))
if in_pending_jobs_section:
task.set_num_required_slots(int(match_obj.group('numSlots')))
else:
assert not bInPendingJobsSection # if we are in the pending jobs section, the job should be new
if not bInPendingJobsSection:
task.add_slots(currentQueueMachine.getName(), int(matchObj.group('numSlots')))
assert not in_pending_jobs_section # if we are in the pending jobs section, the job should be new
if not in_pending_jobs_section:
task.add_slots(current_queue_machine.get_name(), int(match_obj.group('numSlots')))
else:
# the current line does not describe a job
if not bInPendingJobsSection:
if not in_pending_jobs_section:
# check if this line describes the status of a machine
matchObj = machineRegularExp.match(line)
if matchObj:
queueName = matchObj.group('queueName')
machineName = matchObj.group('machineName')
queueMachine = QueueMachine(queueName, machineName)
match_obj = machine_regular_exp.match(line)
if match_obj:
queue_name = match_obj.group('queueName')
machine_name = match_obj.group('machineName')
queue_machine = QueueMachine(queue_name, machine_name)
# log(line)
# log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString'))
# log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots'))
queueMachine.setNumSlots(int(matchObj.group('numTotalSlots')))
queueMachine.setNumUsedSlots(int(matchObj.group('numUsedSlots')))
strCpuLoad = matchObj.group('cpuLoad')
if strCpuLoad != '-NA-':
queueMachine.setCpuLoad(float(strCpuLoad))
queue_machine.set_num_slots(int(match_obj.group('numTotalSlots')))
queue_machine.set_num_used_slots(int(match_obj.group('numUsedSlots')))
cpu_load_as_str = match_obj.group('cpuLoad')
if cpu_load_as_str != '-NA-':
queue_machine.set_cpu_load(float(cpu_load_as_str))
strQueueMachineState = matchObj.group('queueMachineStatus')
queueMachine.setState(self.parseQueueMachineState(strQueueMachineState))
queue_machine_state_as_str = match_obj.group('queueMachineStatus')
queue_machine.set_state(self.parseQueueMachineState(queue_machine_state_as_str))
# log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"')
# log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"')
currentQueueMachine = queueMachine
jobsState.addQueueMachine(queueMachine)
current_queue_machine = queue_machine
jobsState.add_queue_machine(queue_machine)
else:
matchObj = pendingJobsHeaderRegularExp.match(line)
if matchObj:
bInPendingJobsSection = True
currentQueueMachine = None
match_obj = pending_jobs_header_regular_exp.match(line)
if match_obj:
in_pending_jobs_section = True
current_queue_machine = None
else:
pass
else:
# we are in a pending jobs section
matchObj = re.match('^[#]+$', line)
if not matchObj:
match_obj = re.match('^[#]+$', line)
if not match_obj:
# unexpected line
print('line = "' + line + '"')
assert False
@ -223,11 +222,11 @@ class QstatParser:
f.close()
return jobsState
def parseJobDetails(self, qstatOutput, job):
def parse_job_details(self, qstat_output: str, task: Task):
"""
adds to job the details parsed from the output of the "qstat -j <jobid>" command
"""
f = io.StringIO(qstatOutput)
f = io.StringIO(qstat_output)
line = f.readline()
fieldRegularExp = re.compile('^(?P<fieldName>[^:]+):[ ]+(?P<fieldValue>[?]*)$')
while len(line) > 0:
@ -238,20 +237,20 @@ class QstatParser:
fieldName = matchObj.group('fieldName')
strFieldValue = matchObj.group('fieldValue')
if fieldName == 'job_number':
assert job.getId().asStr() == strFieldValue
assert task.getId().asStr() == strFieldValue
elif fieldName == 'hard_queue_list':
allowedQueues = strFieldValue.split(',')
assert len(allowedQueues) > 0
job.m_jobRequirements.m_queues = allowedQueues
task.job_requirements.queues = allowedQueues
elif fieldName == 'parallel environment':
# the value could be 'ompi range: 32'
matchObj = re.match('ompi range: (?P<numSlots>[0-9]+)[?]*', strFieldValue)
if matchObj:
job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI
task.job_requirements.parallel_environment = ParallelEnvironment.MPI
else:
assert False
else:
# ignore he other fields
None
# ignore the other fields
pass
line = f.readline()
f.close()

View File

@ -1,3 +1,7 @@
from typing import Optional
from .Job import QueueMachineId
from .ClusterNode import ClusterNodeId
class QueueMachineStateFlags: #
DISABLED = 1 # the queue machine is disabled
@ -12,70 +16,73 @@ class QueueMachine:
"""
a QueueMachine instance represents a given SGE queue on a given machine (eg allintel.q@simpatix10)
"""
def __init__(self, queueName, machineName):
self.m_queueName = queueName
self.m_machineName = machineName
self.m_numSlots = None
self.m_numUsedSlots = None
self.m_fCpuLoad = None
self.m_stateFlags = 0
self.m_strDisableMessage = ''
queue_name: str
machine_name: ClusterNodeId
num_slots: Optional[int]
num_used_slots: Optional[int]
cpu_load: Optional[float]
state_flags: int
disable_message: str
def getName(self):
def __init__(self, queueName, machineName):
self.queue_name = queueName
self.machine_name = machineName
self.num_slots = None
self.num_used_slots = None
self.cpu_load = None
self.state_flags = 0
self.disable_message = ''
def get_name(self) -> QueueMachineId:
"""
returns the name of the machine queue (such as allintel.q@simpatix10)
"""
return self.m_queueName + '@' + self.m_machineName
return self.queue_name + '@' + self.machine_name
def getQueueName(self):
return self.m_queueName
def get_queue_name(self) -> str:
return self.queue_name
def getMachineName(self):
return self.m_machineName
def get_machine_name(self) -> str:
return self.machine_name
def setNumSlots(self, numSlots):
self.m_numSlots = numSlots
def set_num_slots(self, num_slots: int):
self.num_slots = num_slots
def setNumUsedSlots(self, numSlots):
self.m_numUsedSlots = numSlots
def set_num_used_slots(self, num_slots: int):
self.num_used_slots = num_slots
def getNumSlots(self):
assert self.m_numSlots is not None
return self.m_numSlots
def get_num_slots(self) -> int:
assert self.num_slots is not None
return self.num_slots
def getNumUsedSlots(self):
assert self.m_numUsedSlots is not None
return self.m_numUsedSlots
def get_num_used_slots(self) -> int:
assert self.num_used_slots is not None
return self.num_used_slots
def setCpuLoad(self, fCpuLoad):
self.m_fCpuLoad = fCpuLoad
def set_cpu_load(self, cpu_load: float):
self.cpu_load = cpu_load
def cpuLoadIsAvailable(self):
return self.m_fCpuLoad is not None
def cpu_load_is_available(self) -> bool:
return self.cpu_load is not None
def getCpuLoad(self):
assert self.m_fCpuLoad is not None
return self.m_fCpuLoad
def get_cpu_load(self) -> float:
assert self.cpu_load is not None
return self.cpu_load
def setState(self, state):
self.m_stateFlags = state
def set_state(self, state: int):
self.state_flags = state
def isDisabled(self):
return self.m_stateFlags & QueueMachineStateFlags.DISABLED
def is_disabled(self) -> bool:
return self.state_flags & QueueMachineStateFlags.DISABLED
def isInErrorState(self):
return self.m_stateFlags & QueueMachineStateFlags.ERROR
def is_in_error_state(self) -> bool:
return self.state_flags & QueueMachineStateFlags.ERROR
def isResponding(self):
return not (self.m_stateFlags & QueueMachineStateFlags.UNKNOWN)
def is_responding(self) -> bool:
return not (self.state_flags & QueueMachineStateFlags.UNKNOWN)
def isInAlarmState(self):
return self.m_stateFlags & QueueMachineStateFlags.ALARM
def is_in_alarm_state(self) -> bool:
return self.state_flags & QueueMachineStateFlags.ALARM
def isSuspended(self):
return self.m_stateFlags & QueueMachineStateFlags.SUSPENDED
"""
def getStateAsString(self):
assert(self.m_strState is not None)
return self.m_strState
"""
def is_suspended(self) -> bool:
return self.state_flags & QueueMachineStateFlags.SUSPENDED

View File

@ -1,21 +1,24 @@
from PowerState import PowerState
from Log import logInfo
import abc
import time
import copy
from .PowerState import PowerState
from .Log import log_info
class Slot:
def __init__(self):
self.m_queueMachine = None
self.m_numSlots = None
self.m_job = None # job for which this slot is allocated
self.queue_machine = None
self.num_slots = None
self.jobs = None # job for which this slot is allocated
class SlotAllocator:
class SlotAllocator(abc.ABCMeta):
"""
a class that defines a strategy for allocating free slots for the given pending jobs
"""
def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState):
@abc.abstractmethod
def get_machinesThatNeedWakeUp(self, pendingJobs, clusterState):
"""
returns the list of machines that need to wake up to make pending jobs running
"""
@ -23,44 +26,45 @@ class SlotAllocator:
class SimpleSlotAllocator(SlotAllocator):
def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState):
def get_machinesThatNeedWakeUp(self, pendingJobs, clusterState):
machinesThatNeedWakeUp = {}
highestPriorityPendingJob = pendingJobs.values()[0]
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr())
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr())
numFreeSlots = {} # contains the number of free slots for each queueMachine
for queueMachine in clusterState.getJobsState().getQueueMachines().values():
numFreeSlots[queueMachine] = clusterState.getJobsState().getNumFreeSlotsOnQueueMachine(queueMachine)
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : init numFreeSlots[%s] with %d ' % (queueMachine.getName(), numFreeSlots[queueMachine]))
remainingNumSlotsToAllocate = highestPriorityPendingJob.m_jobRequirements.m_numSlots
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
for queueMachine in clusterState.get_jobs_state().get_queue_machines().values():
numFreeSlots[queueMachine] = clusterState.get_jobs_state().getNumFreeSlotsOnQueueMachine(queueMachine)
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : init numFreeSlots[%s] with %d ' % (queueMachine.get_name(), numFreeSlots[queueMachine]))
remainingNumSlotsToAllocate = highestPriorityPendingJob.job_requirements.num_slots
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
# first look in running machines if there are available slots
for queueMachine in clusterState.getJobsState().getQueueMachines().values():
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName())
machine = clusterState.getMachines()[queueMachine.getMachineName()]
if machine.getPowerState() == PowerState.ON:
if clusterState.queueMachineFitsJobRequirements(queueMachine, highestPriorityPendingJob.m_jobRequirements):
for queueMachine in clusterState.get_jobs_state().get_queue_machines().values():
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.get_name())
machine = clusterState.get_machines()[queueMachine.get_machine_name()]
if machine.get_power_state() == PowerState.ON:
if clusterState.queue_machine_fits_job_requirements(queueMachine, highestPriorityPendingJob.job_requirements):
numSlotsAllocatedOnThisMachine = min(numFreeSlots[queueMachine], remainingNumSlotsToAllocate)
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName()))
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.get_machine_name()))
remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine
numFreeSlots[queueMachine] -= numSlotsAllocatedOnThisMachine
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
assert remainingNumSlotsToAllocate >= 0
if remainingNumSlotsToAllocate == 0:
break
if remainingNumSlotsToAllocate > 0:
# now look into machines that are asleep
for queueMachine in clusterState.getJobsState().getQueueMachines().values():
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName())
machine = clusterState.getMachines()[queueMachine.getMachineName()]
if machine.getPowerState() == PowerState.SLEEP:
if clusterState.queueMachineFitsJobRequirements(queueMachine, highestPriorityPendingJob.m_jobRequirements):
for queueMachine in clusterState.get_jobs_state().get_queue_machines().values():
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.get_name())
machine = clusterState.get_machines()[queueMachine.get_machine_name()]
if machine.get_power_state() == PowerState.SLEEP:
if clusterState.queue_machine_fits_job_requirements(queueMachine, highestPriorityPendingJob.job_requirements):
numSlotsAllocatedOnThisMachine = min(numFreeSlots[queueMachine], remainingNumSlotsToAllocate)
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName()))
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.get_machine_name()))
remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine
numFreeSlots[queueMachine] -= numSlotsAllocatedOnThisMachine
machinesThatNeedWakeUp[machine.getName()] = machine
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
machinesThatNeedWakeUp[machine.get_name()] = machine
log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
assert remainingNumSlotsToAllocate >= 0
if remainingNumSlotsToAllocate == 0:
break
@ -75,9 +79,9 @@ class DecoupledSlotAllocator(SlotAllocator):
Instead, it uses a very simple strategy : it wakes up all the machines periodically to allow jobs to get in.
"""
def __init__(self):
self.m_delayBetweenPeriodicChecks = -1 # in seconds. Disable periodic checks by setting this to -1
self.m_lastCheckTime = time.time()
self.m_lastClusterState = None
self.delay_between_periodic_checks = -1 # in seconds. Disable periodic checks by setting this to -1
self.last_check_time = time.time()
self.last_cluster_state = None
def jobsStateHasChanged(self, newClusterState):
"""
@ -85,26 +89,24 @@ class DecoupledSlotAllocator(SlotAllocator):
to start (provided all machines are enabled)
"""
oldJobs = {}
if self.m_lastClusterState:
oldJobs = self.m_lastClusterState.m_jobsState.m_jobs
newJobs = newClusterState.m_jobsState.m_jobs
if self.last_cluster_state:
oldJobs = self.last_cluster_state.jobs_state.jobs
newJobs = newClusterState.jobs_state.jobs
bJobsHaveChanged = False
oldJobsOnly = oldJobs.copy() # shallow copy
# print 'oldJobs : ', oldJobs
# print 'newJobs : ', newJobs
"""
print 'self.m_lastClusterState', self.m_lastClusterState
print 'newClusterState', newClusterState
if self.m_lastClusterState:
print 'self.m_lastClusterState.m_jobsState', self.m_lastClusterState.m_jobsState
print 'newClusterState.m_jobsState', newClusterState.m_jobsState
print 'id(self.m_lastClusterState) : ', id(self.m_lastClusterState)
print 'id(newClusterState) : ', id(newClusterState)
print 'len(oldJobs) : ', len(oldJobs)
print 'len(newJobs) : ', len(newJobs)
print 'id(oldJobs) : ', id(oldJobs)
print 'id(newJobs) : ', id(newJobs)
"""
# print 'self.last_cluster_state', self.last_cluster_state
# print 'newClusterState', newClusterState
# if self.last_cluster_state:
# print 'self.last_cluster_state.jobs_state', self.last_cluster_state.jobs_state
# print 'newClusterState.jobs_state', newClusterState.jobs_state
# print 'id(self.last_cluster_state) : ', id(self.last_cluster_state)
# print 'id(newClusterState) : ', id(newClusterState)
# print 'len(oldJobs) : ', len(oldJobs)
# print 'len(newJobs) : ', len(newJobs)
# print 'id(oldJobs) : ', id(oldJobs)
# print 'id(newJobs) : ', id(newJobs)
for newJob in newJobs.values():
# logDebug('DecoupledSlotAllocator::jobsStateHasChanged newJob id=%s' % newJob.getId().asStr())
if newJob.getId() in oldJobs:
@ -112,36 +114,36 @@ class DecoupledSlotAllocator(SlotAllocator):
del oldJobsOnly[newJob.getId()]
else:
# ah ... a new job has arrived
logInfo('A new job (jobId =%s) has been detected ' % newJob.getId().asStr())
log_info('A new job (jobId =%s) has been detected ' % newJob.getId().asStr())
bJobsHaveChanged = True
if len(oldJobsOnly) != 0:
for oldJob in oldJobsOnly.values():
logInfo('Job (jobId =%s) has finished' % oldJob.getId().asStr())
log_info('Job (jobId =%s) has finished' % oldJob.getId().asStr())
# at least one old job has finished, freeing some slots
bJobsHaveChanged = True
return bJobsHaveChanged
def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState):
def get_machinesThatNeedWakeUp(self, pendingJobs, clusterState):
machinesThatNeedWakeUp = {}
bJobsStateHasChanged = self.jobsStateHasChanged(clusterState)
bJobsStateHasChanged = self.jobsStateHasChanged(clusterState) # pylint: disable=no-value-for-parameter
currentTime = time.time()
# we do periodic checks to detect changes in cluster state that are not detected by jobsStateHasChanged
# for example changes in the requirements, in the allocation policy, etc...
bItsTimeForPeriodicCheck = False
if self.m_delayBetweenPeriodicChecks > 0:
bItsTimeForPeriodicCheck = (currentTime - self.m_lastCheckTime) > self.m_delayBetweenPeriodicChecks
if self.delay_between_periodic_checks > 0:
bItsTimeForPeriodicCheck = (currentTime - self.last_check_time) > self.delay_between_periodic_checks
if bJobsStateHasChanged or bItsTimeForPeriodicCheck:
if bJobsStateHasChanged:
logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed')
log_info('DecoupledSlotAllocator::get_machinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed')
else:
logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)')
for queueMachine in clusterState.getJobsState().getQueueMachines().values():
if queueMachine.getMachineName() in clusterState.getMachines():
log_info('DecoupledSlotAllocator::get_machinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)')
for queueMachine in clusterState.get_jobs_state().get_queue_machines().values():
if queueMachine.get_machine_name() in clusterState.get_machines():
# this means that the machine is under the cluster controller's control
machine = clusterState.getMachines()[queueMachine.getMachineName()]
if machine.getPowerState() == PowerState.SLEEP:
machinesThatNeedWakeUp[machine.getName()] = machine
self.m_lastCheckTime = currentTime
self.m_lastClusterState = copy.copy(clusterState)
# print 'self.m_lastClusterState', self.m_lastClusterState
machine = clusterState.get_machines()[queueMachine.get_machine_name()]
if machine.get_power_state() == PowerState.SLEEP:
machinesThatNeedWakeUp[machine.get_name()] = machine
self.last_check_time = currentTime
self.last_cluster_state = copy.copy(clusterState)
# print 'self.last_cluster_state', self.last_cluster_state
return machinesThatNeedWakeUp

View File

@ -1,58 +1,58 @@
import time
from Util import executeProgram
from QstatParser import QstatParser
from Log import logDebug, logWarning
from .Util import execute_program
from .QstatParser import QstatParser
from .Log import logDebug, log_warning
class SunGridEngine:
def getCurrentJobsState(self):
def get_current_job_state(self):
bBUG_00000009_IS_STILL_ALIVE = True
if bBUG_00000009_IS_STILL_ALIVE:
logDebug('Querying the current state of jobs')
returnCode = -1
delayBetweenAttemps = 5 # in seconds
while returnCode != 0:
return_code = -1
delay_between_attempts = 5 # in seconds
while return_code != 0:
command = ['qstat', '-f', '-u', '*']
(returnCode, qstatOutput, stderr) = executeProgram(command)
if returnCode != 0:
logWarning('command "%s" failed (returnCode = %d, stdout="%s", stderr="%s"). Retrying in %d seconds' % (' '.join(command), returnCode, qstatOutput, stderr, delayBetweenAttemps))
time.sleep(delayBetweenAttemps)
(return_code, qstat_output, stderr) = execute_program(command)
if return_code != 0:
log_warning('command "%s" failed (returnCode = %d, stdout="%s", stderr="%s"). Retrying in %d seconds' % (' '.join(command), return_code, qstat_output, stderr, delay_between_attempts))
time.sleep(delay_between_attempts)
if bBUG_00000009_IS_STILL_ALIVE:
logDebug('Just got current state of jobs')
jobsState = QstatParser().parseQstatOutput(qstatOutput)
jobsState.setTime(time.time())
jobs_state = QstatParser().parse_qstat_output(qstat_output)
jobs_state.set_time(time.time())
# read the requirements for pending jobs (which parallel environment, which queue, which architecture) from sge
if False: # no need for job details at the moment and since it's very slow, it's been disabled
for unused_jobId, job in jobsState.getPendingJobs().items():
(returnCode, stdout, stderr) = executeProgram(['qstat', '-j', job.getId().asStr()])
assert returnCode != 0, 'prout'
QstatParser().parseJobDetails(stdout, job)
for unused_jobId, job in jobs_state.get_pending_jobs().items():
(return_code, stdout, stderr) = execute_program(['qstat', '-j', job.getId().asStr()])
assert return_code != 0, 'prout'
QstatParser().parse_job_details(stdout, job)
return jobsState
return jobs_state
def setQueueInstanceActivation(self, strQueueInstanceName, bEnable):
def set_queue_instance_activation(self, queue_instance_name: str, enable: bool):
argument = 'd'
if bEnable:
if enable:
argument = 'e'
bBUG_00000269_IS_STILL_ALIVE = True # for some reason, qmod -d (and maybe any sge command) could fail with error: commlib error: can't connect to service (Address already in use)
delayBetweenAttemps = 5 # in seconds
delay_between_attempts = 5 # in seconds
while True:
errorCode, unused_stdout, unused_stderr = executeProgram(['qmod', '-' + argument, strQueueInstanceName])
error_code, unused_stdout, unused_stderr = execute_program(['qmod', '-' + argument, queue_instance_name])
if bBUG_00000269_IS_STILL_ALIVE:
# if the command failed, try again
if errorCode == 0:
if error_code == 0:
break
time.sleep(delayBetweenAttemps)
time.sleep(delay_between_attempts)
else:
break
return (errorCode == 0)
return (error_code == 0)
def queueIsEmpty(self, strMachineName):
(returnCode, qstatOutput, unused_stderr) = executeProgram(['qstat', '-f', '-u', '*'])
def queue_is_empty(self, machine_name: str):
(returnCode, qstat_output, unused_stderr) = execute_program(['qstat', '-f', '-u', '*'])
assert returnCode == 0
jobsState = QstatParser().parseQstatOutput(qstatOutput)
jobs = jobsState.getJobsOnMachine(strMachineName)
jobs_state = QstatParser().parse_qstat_output(qstat_output)
jobs = jobs_state.get_jobs_on_machine(machine_name)
return (len(jobs) == 0)

View File

@ -1,24 +1,21 @@
#!/usr/bin/env python
import sys
sys.path.insert(0, '..')
from Log import logInfo
import Util
from PowerState import PowerState
from HTMLParser import HTMLParser
from .Log import log_info
from .Util import get_power_state, blocking_put_machine_to_sleep, blocking_wake_up_machine, execute_command, execute_ipmi_command
from .PowerState import PowerState
def Test0000():
logInfo('Testing bug 00000003 if a series of wake up, goto sleep can shutdown a machine')
strTargetMachineName = 'simpatix12'
ePowerState = Util.getPowerState(strTargetMachineName)
log_info('Testing bug 00000003 if a series of wake up, goto sleep can shutdown a machine')
strTarget_machine_name = 'simpatix12'
ePowerState = get_power_state(strTarget_machine_name)
while True:
if ePowerState == PowerState.ON:
bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName)
bSuccess = blocking_put_machine_to_sleep(strTarget_machine_name)
assert bSuccess
bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName)
bSuccess = blocking_put_machine_to_sleep(strTarget_machine_name)
ePowerState = PowerState.SLEEP
elif ePowerState == PowerState.SLEEP:
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
bSuccess = blocking_wake_up_machine(strTarget_machine_name)
assert bSuccess
ePowerState = PowerState.ON
else:
@ -26,30 +23,30 @@ def Test0000():
def Test0001():
logInfo('Testing bug 00000003 : could it be caused by a sleep and a power on at the same tim ?')
strTargetMachineName = 'simpatix12'
ePowerState = Util.getPowerState(strTargetMachineName)
log_info('Testing bug 00000003 : could it be caused by a sleep and a power on at the same tim ?')
strTarget_machine_name = 'simpatix12'
ePowerState = get_power_state(strTarget_machine_name)
if ePowerState == PowerState.SLEEP:
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
bSuccess = blocking_wake_up_machine(strTarget_machine_name)
assert bSuccess
ePowerState = PowerState.ON
assert ePowerState == PowerState.ON
Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName)
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
execute_command("ssh %s 'pmset sleepnow'" % strTarget_machine_name)
bSuccess = blocking_wake_up_machine(strTarget_machine_name)
assert bSuccess
def Test0002():
logInfo('Testing bug 00000003 : could it be caused by a power on quickly followed by a sleep ?')
strTargetMachineName = 'simpatix12'
ePowerState = Util.getPowerState(strTargetMachineName)
log_info('Testing bug 00000003 : could it be caused by a power on quickly followed by a sleep ?')
strTarget_machine_name = 'simpatix12'
ePowerState = get_power_state(strTarget_machine_name)
if ePowerState == PowerState.ON:
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
bSuccess = blocking_wake_up_machine(strTarget_machine_name)
assert bSuccess
ePowerState = PowerState.SLEEP
assert ePowerState == PowerState.SLEEP
Util.executeIpmiCommand(strTargetMachineName, 'chassis power on')
Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName)
execute_ipmi_command(strTarget_machine_name, 'chassis power on')
execute_command("ssh %s 'pmset sleepnow'" % strTarget_machine_name)
if __name__ == '__main__':

View File

@ -1,80 +1,81 @@
# import .Util
# import ..SimpaDbUtil
from .Log import logDebug, logInfo, logWarning, logError
from .PowerState import PowerState, PowerStateToStr
import re
import io
import os
import traceback
import sys
import time
from ..Util import execute_program as exe_prog
from ..Util import execute_command as exe_comm
from ..Util import send_text_mail
from ..SimpaDbUtil import getLightOutManagementIpAddress, is_machine_responding
from .Log import logDebug, log_info, log_warning, logError
from .PowerState import PowerState, PowerStateToStr
def executeProgram(astrArguments):
def execute_program(astrArguments):
bBUG_00000008_IS_STILL_ACTIVE = True
if bBUG_00000008_IS_STILL_ACTIVE:
logDebug('executeProgram : program = [%s]' % (','.join(astrArguments)))
(returnCode, stdout, stderr) = Lib.Util.executeProgram(astrArguments)
logDebug('execute_program : program = [%s]' % (','.join(astrArguments)))
(returnCode, stdout, stderr) = exe_prog(astrArguments)
if bBUG_00000008_IS_STILL_ACTIVE:
logDebug('executeCommand : return code of [%s] = %d' % (','.join(astrArguments), returnCode))
logDebug('execute_command : return code of [%s] = %d' % (','.join(astrArguments), returnCode))
# for debugging purpose, log info in case the command failed
if returnCode != 0:
logDebug('executeCommand : return code of [%s] = %d' % (','.join(astrArguments), returnCode))
logDebug('executeCommand : stdout of [%s] = %s' % (','.join(astrArguments), stdout))
logDebug('executeCommand : stderr of [%s] = %s' % (','.join(astrArguments), stderr))
logDebug('execute_command : return code of [%s] = %d' % (','.join(astrArguments), returnCode))
logDebug('execute_command : stdout of [%s] = %s' % (','.join(astrArguments), stdout))
logDebug('execute_command : stderr of [%s] = %s' % (','.join(astrArguments), stderr))
return (returnCode, stdout, stderr)
def executeCommand(command):
# logDebug('executeCommand : command = ' + command)
(returnCode, stdout, stderr) = Lib.Util.executeCommand(command)
# logDebug('executeCommand : return code of "'+command+'" = '+str(returnCode))
def execute_command(command):
# logDebug('execute_command : command = ' + command)
(returnCode, stdout, stderr) = exe_comm(command)
# logDebug('execute_command : return code of "'+command+'" = '+str(returnCode))
return (returnCode, stdout, stderr)
def executeIpmiCommand(machineName, ipmiCommandArgs):
lomIpAddress = Lib.SimpaDbUtil.getLightOutManagementIpAddress(machineName)
def execute_ipmi_command(machineName, ipmiCommandArgs):
lomIpAddress = getLightOutManagementIpAddress(machineName)
lomPasswordFilepath = '/usr/local/etc/LightOutManagementPassword.txt'
astrProgram = ['ipmitool', '-U', 'admin', '-H', lomIpAddress, '-f', lomPasswordFilepath]
astrProgram.extend(ipmiCommandArgs)
# print 'executeIpmiCommand'
# print 'execute_ipmi_command'
# print astrProgram
bBUG_00000005_IS_STILL_ACTIVE = True
if bBUG_00000005_IS_STILL_ACTIVE:
# bug 00000005 causes ipmitool to randomly fail for no apparent reason (two consecutive calls might give different errors, these errors not always being timeouts). Therefore we try and try again, until the command succeeds. If we don't do this, cluster controller keeps stopping because ipmi commands fail. The effect of this hack is that the UNPLUGGED power state is no longer detected; therefore, with this hack, cluster controller is expecting all machines to be plugged.
bCommandSucceeded = False
while not bCommandSucceeded:
(returnCode, stdout, stderr) = executeProgram(astrProgram)
(returnCode, stdout, stderr) = execute_program(astrProgram)
if returnCode == 0:
bCommandSucceeded = True
else:
logWarning('the command "%s" failed. Retrying a bit later' % ' '.join(astrProgram))
log_warning('the command "%s" failed. Retrying a bit later' % ' '.join(astrProgram))
time.sleep(5) # wait for 5 seconds before the next attempt, in order not to saturate activity
else:
(returnCode, stdout, stderr) = executeProgram(astrProgram)
"""
sh-3.2# ipmitool -U admin -H 129.20.27.220 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State'
Unabled to establish a session with the BMC.
Command failed due to insufficient resources for session (0xFFFEF901)
-> this error means that the number of active conections to the BMC has reached the maximum (usually 5).
(returnCode, stdout, stderr) = execute_program(astrProgram)
# sh-3.2# ipmitool -U admin -H 129.20.27.220 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State'
# Unabled to establish a session with the BMC.
# Command failed due to insufficient resources for session (0xFFFEF901)
# -> this error means that the number of active conections to the BMC has reached the maximum (usually 5).
sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State'
Unabled to establish a session with the BMC.
Command failed due to Unknown (0xFFFEF923) (0xFFFEF923)
# sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State'
# Unabled to establish a session with the BMC.
# Command failed due to Unknown (0xFFFEF923) (0xFFFEF923)
sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State'
Unabled to establish a session with the BMC.
Command failed due to Timeout (0xFFFEF9C3)
"""
# sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State'
# Unabled to establish a session with the BMC.
# Command failed due to Timeout (0xFFFEF9C3)
return (returnCode, stdout, stderr)
def getPowerState(machineName):
def get_power_state(machineName):
ePowerState = PowerState.UNKNOWN
bPowerStateRead = False
iNumFailedAttempts = 0
while not bPowerStateRead:
(returnCode, stdout, stderr) = executeIpmiCommand(machineName, ['sensor', 'get', 'ACPI State'])
(returnCode, stdout, _stderr) = execute_ipmi_command(machineName, ['sensor', 'get', 'ACPI State'])
if returnCode == 0:
matchObj = re.search(r'\[(?P<AcpiState>S[0-9][^\:]*)\:', stdout)
bBUG_00000002_IS_STILL_ACTIVE = True
@ -83,7 +84,7 @@ def getPowerState(machineName):
# the following warning has been commented out because it pollutes the logs and apparently
# it's a 'feature' of all 4 core machines : if the machine is woken up using ipmitool, then
# no power on event is logged ...
# logWarning('degenerate ipmitool output for machine %s (see bug 00000002). Assuming power in on because that''s what I noticed when I had the case.' % machineName)
# log_warning('degenerate ipmitool output for machine %s (see bug 00000002). Assuming power in on because that''s what I noticed when I had the case.' % machineName)
return PowerState.ON
else:
assert matchObj
@ -103,31 +104,31 @@ def getPowerState(machineName):
iMAX_NUM_ATTEMPTS = 5
iNumFailedAttempts += 1
if iNumFailedAttempts < iMAX_NUM_ATTEMPTS:
logWarning('failed to read the power state of %s. I\'ll try a again a bit later....' % machineName)
log_warning('failed to read the power state of %s. I\'ll try a again a bit later....' % machineName)
time.sleep(5)
else:
logWarning('failed to read the power state of %s too many times. I assume this machine is unplugged' % machineName)
log_warning('failed to read the power state of %s too many times. I assume this machine is unplugged' % machineName)
ePowerState = PowerState.UNPLUGGED # too many attempts failed ... I guess it's because the machine is unplugged
bPowerStateRead = True
return ePowerState
def wakeUpMachine(machineName):
def wake_up_machine(machineName):
"""
this method seems more reliable than wake on lan (sometimes, sending wake on lan packet seems to have no effect)
@return true on success, false otherwise
@note I once had this method failing for no obvious reason.. maybe this command does not succeed if the machine is in a transition state
"""
(returnCode, stdout, stderr) = executeIpmiCommand(machineName, ['chassis', 'power', 'on'])
(returnCode, _stdout, _stderr) = execute_ipmi_command(machineName, ['chassis', 'power', 'on'])
bSuccess = (returnCode == 0) # this command can fail if the machine is manually unplugged for example
return bSuccess
def blockingPutMachineToSleep(machineName):
def blocking_put_machine_to_sleep(machineName):
"""
@return true on success, false otherwise
"""
logInfo('putting machine %s to sleep...' % machineName)
log_info('putting machine %s to sleep...' % machineName)
iMaxNumAttempts = 5
bSuccess = False
bBUG_239_IS_STILL_ALIVE = True
@ -135,36 +136,36 @@ def blockingPutMachineToSleep(machineName):
# note : each sleep order is not actually succeeding (god knows why). Therefore, we need to try again and again.
while not bSuccess:
# note : pmset must be executed as root
(returnCode, stdout, stderr) = executeProgram(['ssh', machineName, 'pmset sleepnow'])
(_returnCode, _stdout, _stderr) = execute_program(['ssh', machineName, 'pmset sleepnow'])
# check if the machine actually went to sleep
iMaxGoToSleepDuration = 30 # in seconds
iDelay = 0
while iDelay < iMaxGoToSleepDuration:
time.sleep(5)
iDelay += 5
ePowerState = getPowerState(machineName)
ePowerState = get_power_state(machineName)
if ePowerState == PowerState.SLEEP:
logInfo('machine %s is now sleeping (put to sleep succeeded)' % machineName)
log_info('machine %s is now sleeping (put to sleep succeeded)' % machineName)
return True
else:
if ePowerState != PowerState.ON:
logWarning('unexpectedly, powerState of %s is %s' % (machineName, PowerStateToStr(ePowerState)))
log_warning('unexpectedly, powerState of %s is %s' % (machineName, PowerStateToStr(ePowerState)))
assert ePowerState == PowerState.ON
iAttempt += 1
if iAttempt > iMaxNumAttempts:
if bBUG_239_IS_STILL_ALIVE:
logWarning('the attempt to put %s to sleep failed too many times (probably because of bug 239 (machine is in a weird state : power on but no ssh possible) ?)... giving up. ' % (machineName))
log_warning('the attempt to put %s to sleep failed too many times (probably because of bug 239 (machine is in a weird state : power on but no ssh possible) ?)... giving up. ' % (machineName))
return False
else:
logWarning('the attempt to put %s to sleep failed too many times... giving up' % (machineName))
log_warning('the attempt to put %s to sleep failed too many times... giving up' % (machineName))
return False
else:
logWarning('the attempt to put %s to sleep failed... trying again' % (machineName))
log_warning('the attempt to put %s to sleep failed... trying again' % (machineName))
return True
def blockingWakeUpMachine(machineName):
logInfo('waking up machine %s...' % machineName)
def blocking_wake_up_machine(machineName):
log_info('waking up machine %s...' % machineName)
numAttempts = 0
bWakeUpFailed = True
while bWakeUpFailed: # try more than once because sometimes for an unknown reason, the wake up order is ignored by the machine ... to be investigated
@ -172,45 +173,45 @@ def blockingWakeUpMachine(machineName):
iNumWakeUpAttempts = 0
bWakeUpMachineSucceeded = False
while not bWakeUpMachineSucceeded:
bWakeUpMachineSucceeded = wakeUpMachine(machineName)
bWakeUpMachineSucceeded = wake_up_machine(machineName)
iNumWakeUpAttempts += 1
# the previous command can fail if the machine is already in a transition
# in that case we try sevral times bevire giving up
if not bWakeUpMachineSucceeded:
if iNumWakeUpAttempts < iMaxNumWakeUpAttempts:
iDelay = 5
logWarning('wake up attempt %d of %s failed... I\'ll try again in %d seconds' % (iNumWakeUpAttempts, machineName, iDelay))
log_warning('wake up attempt %d of %s failed... I\'ll try again in %d seconds' % (iNumWakeUpAttempts, machineName, iDelay))
time.sleep(iDelay)
else:
logWarning('wake up attempt %d of %s failed too many times... giving up' % (iNumWakeUpAttempts, machineName))
log_warning('wake up attempt %d of %s failed too many times... giving up' % (iNumWakeUpAttempts, machineName))
return False # couldn't wake up to machine for whatever reason
bWakeUpFailed = False
# wait until the machine is operational
WAKEUPTIMEOUT = 5 * 60 # max number of seconds allowed for a machine to be alive after a wakeup request
wakeUpToAliveDuration = 0
while not Lib.SimpaDbUtil.isMachineResponding(machineName):
while not is_machine_responding(machineName):
time.sleep(5)
wakeUpToAliveDuration += 5
if wakeUpToAliveDuration > WAKEUPTIMEOUT:
# the wake up failed for whatever reason (power state changed manually ? wake up order got lost ?)
logWarning('%s took too long (more than %d seconds) to respond after a successful wakeup request.' % (machineName, WAKEUPTIMEOUT))
log_warning('%s took too long (more than %d seconds) to respond after a successful wakeup request.' % (machineName, WAKEUPTIMEOUT))
bWakeUpFailed = True
break
if bWakeUpFailed:
numAttempts += 1
if numAttempts >= 2:
logWarning('giving up waking up %s because the wake up request succeeded but the machine never actually came alive (and this too many times)' % (machineName))
log_warning('giving up waking up %s because the wake up request succeeded but the machine never actually came alive (and this too many times)' % (machineName))
return False # power state changed manually ?
else:
logWarning('attempting to wake up %s one more time' % (machineName))
log_warning('attempting to wake up %s one more time' % (machineName))
else:
# wake up completed
logInfo('Waking up of machine %s completed successfully' % machineName)
log_info('Waking up of machine %s completed successfully' % machineName)
return True
def onException(exception):
def on_exception(exception):
sys.stdout.flush()
strExceptionType = type(exception)
strMessage = 'exception %s : %s\n' % (strExceptionType, exception.message)
@ -224,11 +225,10 @@ def onException(exception):
try:
# I had the case (see bugzilla 234) where an assert failed in a child thread, but the main process kept going. I suspect that it was caused
# by a failure of sendTextMail... that's why I've embedded the sendmail inside a try except block, so that is this operation fails, then the
# by a failure of send_text_mail... that's why I've embedded the sendmail inside a try except block, so that is this operation fails, then the
# kill of the main process is still executed.
Lib.Util.sendTextMail('ClusterController <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'ClusterController has stopped because of an exception', strMessage)
send_text_mail('ClusterController <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'ClusterController has stopped because of an exception', strMessage)
except BaseException:
logError("Could not send the email to notify the administrator that cluster controller failed")
pass
executeCommand('kill -9 %d' % os.getpid()) # stop other threads immediately
execute_command('kill -9 %d' % os.getpid()) # stop other threads immediately
exit()

View File

@ -1,13 +1,13 @@
# Copyright Jon Berg , turtlemeat.com
import string,cgi,time
import cgi
import time
from os import curdir, sep
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer # pylint:disable=import-error
import threading
import Util
# import pri
from urlparse import urlparse, parse_qs
from urlparse import urlparse, parse_qs # pylint:disable=import-error
import xml.dom.minidom
from .Util import on_exception
# >>> url = 'http://example.com/?foo=bar&one=1'
# >>> parse_qs(urlparse(url).query)
# {'foo': ['bar'], 'one': ['1']}
@ -19,7 +19,7 @@ class MyHandler(BaseHTTPRequestHandler):
try:
paramsDict = parse_qs(urlparse(self.path).query)
if self.path.endswith(".html"):
f = open(curdir + sep + self.path) #self.path has /test.html
f = open(curdir + sep + self.path, encoding='utf8') # self.path has /test.html
# note that this potentially makes every file on your computer readable by the internet
self.send_response(200)
@ -47,10 +47,10 @@ class MyHandler(BaseHTTPRequestHandler):
controlledMachinesElement = doc.createElement("ControlledMachines")
doc.appendChild(controlledMachinesElement)
for machine in self.server.m_clusterController.m_clusterStatus.m_clusterNodes.values():
for machine in self.server.cluster_controller.cluster_status.cluster_nodes.values():
# Create the main <card> element
controlledMachineElement = doc.createElement("Machine")
controlledMachineElement.setAttribute("name", machine.getName())
controlledMachineElement.setAttribute("name", machine.get_name())
controlledMachinesElement.appendChild(controlledMachineElement)
# Print our newly created XML
self.wfile.write(doc.toprettyxml(indent=" "))
@ -58,24 +58,20 @@ class MyHandler(BaseHTTPRequestHandler):
if urlparse(self.path).path == '/SetControlOnMachine': # http://simpatix10.univ-rennes1.fr:8080/SetControlOnMachine?machineName=simpatix30&control=1
machineName = paramsDict['machineName'][0]
bControl = (paramsDict['control'][0] == '1')
self.server.m_clusterController.setControlOnMachine(machineName, bControl)
self.server.cluster_controller.set_control_on_machine(machineName, bControl)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
if bControl == True:
if bControl is True:
self.wfile.write("%s is now controlled by ClusterController" % machineName)
else:
self.wfile.write("%s is no longer controlled by ClusterController" % machineName)
return
return
except IOError:
self.send_error(404, 'File Not Found: %s' % self.path)
def do_POST(self):
global rootnode
try:
ctype, pdict = cgi.parse_header(self.headers.getheader('content-type'))
if ctype == 'multipart/form-data':
@ -84,26 +80,30 @@ class MyHandler(BaseHTTPRequestHandler):
self.end_headers()
upfilecontent = query.get('upfile')
print "filecontent", upfilecontent[0]
self.wfile.write("<HTML>POST OK.<BR><BR>");
self.wfile.write(upfilecontent[0]);
print("filecontent", upfilecontent[0])
self.wfile.write("<HTML>POST OK.<BR><BR>")
self.wfile.write(upfilecontent[0])
except :
except BaseException:
pass
class WebServerThread(threading.Thread):
stop: bool
http_server: HTTPServer
def __init__(self, clusterController):
threading.Thread.__init__(self)
#self.m_clusterController = clusterController
self.m_bStop = False
self.m_httpServer = HTTPServer(('', 8080), MyHandler)
self.m_httpServer.m_clusterController = clusterController
# self.cluster_controller = clusterController
self.stop = False
self.http_server = HTTPServer(('', 8080), MyHandler)
self.http_server.cluster_controller = clusterController
def run(self):
try:
while not self.m_bStop:
self.m_httpServer.handle_request()
#self.m_httpServer.serve_forever()
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
self.m_httpServer.socket.close()
Util.onException(exception)
while not self.stop:
self.http_server.handle_request()
# self.http_server.serve_forever()
except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
self.http_server.socket.close()
on_exception(exception)

View File

@ -12,35 +12,35 @@ class ClusterNodeSensorsReadings:
POWERSTATE_SLEEP=3
"""
def __init__(self, clusterNodeName):
self.m_clusterNodeName = clusterNodeName
self.m_sensors = {}
# self.m_powerState = ClusterNodeStatus.POWERSTATE_UNKNOWN
self.cluster_node_name = clusterNodeName
self.sensors = {}
# self.power_state = ClusterNodeStatus.POWERSTATE_UNKNOWN
return
def addSensor(self, sensor):
self.m_sensors[sensor.m_name] = sensor
self.sensors[sensor.name] = sensor
def dump(self):
for key, sensor in self.m_sensors.items():
for key, sensor in self.sensors.items():
sensor.dump()
return
# def getPowerState(self):
# return self.m_powerState
# def get_power_state(self):
# return self.power_state
def getLowestTemperature(self):
# log('ClusterNodeSensorsReadings::getLowestTemperature : start')
lowestTemperature = 0.0
lowestTemperatureIsDefined = False
for key, sensor in self.m_sensors.items():
for key, sensor in self.sensors.items():
# log('ClusterNodeSensorsReadings::getLowestTemperature : start')
if sensor.typeName() == 'Temperature':
sensor.m_temperature
sensor.temperature
if lowestTemperatureIsDefined:
if sensor.m_temperature < lowestTemperature:
lowestTemperature = sensor.m_temperature
if sensor.temperature < lowestTemperature:
lowestTemperature = sensor.temperature
else:
lowestTemperature = sensor.m_temperature
lowestTemperature = sensor.temperature
lowestTemperatureIsDefined = True
assert lowestTemperatureIsDefined
# log('ClusterNodeSensorsReadings::getLowestTemperature : end')

View File

@ -31,12 +31,12 @@ class IpmiTool202Parser:
rpms = self.parseFanSensorOutput(f)
if temperature is not None:
sensor = FanSensor(sensorName)
sensor.m_rpms = rpms
sensor.rpms = rpms
elif sensorType == 'Temperature':
temperature = self.parseTemperatureSensorOutput(f)
if temperature is not None:
sensor = TemperatureSensor(sensorName)
sensor.m_temperature = temperature
sensor.temperature = temperature
else:
# ignoring other sensors
sensor = None

View File

@ -22,10 +22,10 @@ class IpmiTool218Parser:
sensor = None
if sensorUnit == 'degrees C':
sensor = TemperatureSensor(sensorName)
sensor.m_temperature = float(sensorValue)
sensor.temperature = float(sensorValue)
elif sensorUnit == 'RPM':
sensor = FanSensor(sensorName)
sensor.m_rpms = float(sensorValue)
sensor.rpms = float(sensorValue)
else:
None
if sensor:

View File

@ -1,23 +1,40 @@
from typing import Optional
class Sensor:
def __init__(self, sensorName):
self.m_name = sensorName
self.m_isValid = True # false if this sensor is not actually present on the target machine
self.name = sensorName
self.is_valid = True # false if this sensor is not actually present on the target machine
return
def dump(self):
print self.m_name
print(self.name)
class FanSensor(Sensor):
rpms: Optional[float]
def __init__(self, sensorName):
Sensor.__init__(self, sensorName)
self.rpms = None
def dump(self):
print 'Fan \'', self.m_name, '\' rpm=',self.m_rpms
print('Fan \'', self.name, '\' rpm=', self.rpms)
def typeName(self):
return 'Fan'
class TemperatureSensor(Sensor):
temperature: Optional[float]
def __init__(self, sensorName):
Sensor.__init__(self, sensorName)
self.temperature = None
def dump(self):
print 'Temperature \'', self.m_name, '\' temperature=',self.m_temperature
print('Temperature \'', self.name, '\' temperature=', self.temperature)
def typeName(self):
return 'Temperature'

View File

@ -8,31 +8,31 @@ else:
import re
from .wol import wake_on_lan
import os
from .Util import executeProgram, executeCommand, log
from .Util import execute_program, execute_command, log
import abc
import sqlite3
from .mysql2sqlite import mysql_to_sqlite
def isMachineResponding(machineName):
(returnCode, stdout, stderr) = executeProgram(['ping', '-o', '-t', '1', machineName])
# log( 'isMachineResponding : result of command %s : %d' % (command, returnCode) )
def is_machine_responding(machineName):
(returnCode, stdout, stderr) = execute_program(['ping', '-o', '-t', '1', machineName])
# log( 'is_machine_responding : result of command %s : %d' % (command, returnCode) )
if returnCode == 0:
return True
else:
bMachineNameIsNotKnown = (returnCode == 68)
# bMachineNameIsNotKnown = (returnCode == 68)
bMachineIsNotResponding = (returnCode == 2)
if bMachineIsNotResponding is False:
bBUG_00000004_IS_STILL_ALIVE = True
if bBUG_00000004_IS_STILL_ALIVE is True and returnCode == 142:
log('isMachineResponding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName))
log('is_machine_responding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName))
# don't stop the program until we understand bug00000004
elif bBUG_00000004_IS_STILL_ALIVE is True and returnCode == -14: # I had this error code on 07/09/2009 20:38 but I don't know yet what that means
log('isMachineResponding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName))
log('is_machine_responding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName))
# don't stop the program until we understand bug00000004
else:
log('isMachineResponding : Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName))
log('is_machine_responding : Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName))
assert False
return False
@ -46,7 +46,6 @@ class ISqlDatabaseBackend(object):
"""
:param str sql_query: the sql query to perform
"""
pass
class RemoteMysqlDb(ISqlDatabaseBackend):
@ -70,7 +69,7 @@ class RemoteMysqlDb(ISqlDatabaseBackend):
:param str sql_query: the sql query to perform
"""
self._conn.query(sql_query)
rows = conn.store_result()
rows = self._conn.store_result()
return rows
@ -87,7 +86,7 @@ class SqlFile(ISqlDatabaseBackend):
# - the file is stored on a solid state disk
try:
os.remove(sqlite_db_path)
except:
except BaseException:
pass
check_same_thread = False
# this is to prevent the following error when run from apache/django : SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 139672342353664 and this is thread id 139672333960960
@ -95,7 +94,7 @@ class SqlFile(ISqlDatabaseBackend):
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
with open(str(self._sql_file_path), 'r') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path
with open(str(self._sql_file_path), 'r', encoding='utf8') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path
sql = f.read() # watch out for built-in `str`
# print(sql)
self._cur = self._con.cursor()
@ -111,7 +110,6 @@ class SqlFile(ISqlDatabaseBackend):
"""
:param str sql_query: the sql query to perform
"""
pass
self._cur.execute(sql_query)
rows = self._cur.fetchall()
return rows
@ -196,7 +194,7 @@ def getLightOutManagementIpAddress(machineName):
return ipAddress
def getClusterMachinesNames():
def get_cluster_machines_names():
clusterMachinesNames = []
conn = MySQLdb.connect('simpatix10', 'simpadb_reader', '', 'simpadb')
assert conn
@ -223,14 +221,12 @@ def machineSupportsIpmi(machineName):
def putToSleep(machineName):
# note : pmset must be executed as root
(returnCode, stdout, stderr) = executeCommand(['ssh', machineName, 'pmset sleepnow'])
"""
print returnCode
print 'stdout :'
print stdout
print 'stderr :'
print stderr
"""
(returnCode, stdout, _stderr) = execute_command(['ssh', machineName, 'pmset sleepnow'])
# print returnCode
# print 'stdout :'
# print stdout
# print 'stderr :'
# print stderr
assert returnCode == 0
# check if the command succeeded by looking at the output (that's the only way I found)
f = StringIO.StringIO(stdout)
@ -255,7 +251,7 @@ def isNonRespondingMachineSleeping(machineName):
"""
wakeUp(machineName)
time.sleep(120)
if isMachineResponding(machineName):
if is_machine_responding(machineName):
putToSleep(machineName)
time.sleep(30) # allow a little time to make sure the machine is ready to receive other wake on lan messages
return True
@ -264,11 +260,9 @@ def isNonRespondingMachineSleeping(machineName):
if __name__ == '__main__':
"""
for i in range(30):
machineName = 'simpatix%d' % (i+10)
print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName))
"""
# for i in range(30):
# machineName = 'simpatix%d' % (i+10)
# print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName))
wakeUp('simpatix21')
# print putToSleep('simpatix13')
# print isNonRespondingMachineSleeping('simpatix13')

View File

@ -1,6 +1,5 @@
import time
import subprocess
import io
import re
import logging
# from wol import *
@ -16,7 +15,7 @@ else:
from email.mime.text import MIMEText
def sendTextMail(strFrom, to, strSubject, text):
def send_text_mail(strFrom, to, strSubject, text):
# from = "SimpaCluster <guillaume.raffy@univ-rennes1.fr>"
mail = MIMEText(text)
mail['From'] = strFrom
@ -30,12 +29,14 @@ def sendTextMail(strFrom, to, strSubject, text):
class Error(Exception):
message: str
def __init__(self, strMessage):
self.m_strMessage = strMessage
self.message = strMessage
def getHostName():
(returnCode, stdout, stderr) = executeProgram(['hostname', '-s'])
(returnCode, stdout, stderr) = execute_program(['hostname', '-s'])
if returnCode != 0:
raise Error(stderr)
strHostName = re.sub(r"\n", "", stdout)
@ -46,18 +47,18 @@ def log(message):
print(time.asctime(time.localtime()) + ' : ' + message)
def executeProgram(astrArguments):
# log('executeProgram : program [%s]' % (','.join(astrArguments)))
def execute_program(astrArguments):
# log('execute_program : program [%s]' % (','.join(astrArguments)))
popen = subprocess.Popen(astrArguments, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # bufsize=1 seems to prevent deadlocks that happen 50% the time
stdout, stderr = popen.communicate()
# popen.wait()
result = (popen.returncode, stdout.decode(), stderr)
# log('executeProgram : command %s popen.pid = %d' % (astrArguments[0], popen.pid))
# log('execute_program : command %s popen.pid = %d' % (astrArguments[0], popen.pid))
# os.kill(popen.pid, signal.SIGTERM)
return result
def executeCommand(command):
def execute_command(command):
"""
executes the shell command such as 'set x=1; myprog $x'
"""
@ -69,25 +70,25 @@ def executeCommand(command):
return result
def executeCommandOn(target_machine_fqdn: str, command: str, user: str = None):
def execute_commandOn(target_machine_fqdn: str, command: str, user: str = None):
"""
execute command on a local or remote machine (using ssh then)
:param str user: if not None, the user that should be used to execute the command (instead of the current user)
"""
logging.debug("executing %s on %s as %s" % (command, target_machine_fqdn, user))
logging.debug("executing %s on %s as %s", command, target_machine_fqdn, user)
if getHostName() == target_machine_fqdn.split('.')[0]:
if user is not None:
# su -c "ls -l /tmp" graffy
result = executeCommand("su -c '%s' %s" % (command, user))
result = execute_command("su -c '%s' %s" % (command, user))
else:
result = executeCommand(command)
result = execute_command(command)
else:
if user is not None:
target = '%s@%s' % (user, target_machine_fqdn)
else:
target = target_machine_fqdn
result = executeProgram(['ssh', target, "%s" % command])
logging.debug("finished executing %s on %s as %s" % (command, target_machine_fqdn, user))
result = execute_program(['ssh', target, "%s" % command])
logging.debug("finished executing %s on %s as %s", command, target_machine_fqdn, user)
return result
@ -97,16 +98,16 @@ def getUpsStatus():
def __init__(self):
HTMLParser.__init__(self)
self.TokenList = []
self.token_list = []
def handle_data(self, data):
data = data.strip()
if data and len(data) > 0:
self.TokenList.append(data)
self.token_list.append(data)
# print data
def GetTokenList(self):
return self.TokenList
def get_token_list(self):
return self.token_list
from urllib.request import urlopen
try:
@ -114,21 +115,20 @@ def getUpsStatus():
f = urlopen(url)
res = f.read()
f.close()
except:
except BaseException:
print("bad read")
return
h = MyHTMLParser()
h.feed(res)
tokensList = h.GetTokenList() # noqa:F841
_tokensList = h.get_token_list() # noqa:F841
raise NotImplementedError('the implementation is not complete')
if __name__ == '__main__':
from SimpaDbUtil import wakeUp
"""
for i in range(30):
machineName = 'simpatix%d' % (i+10)
print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName))
"""
from .SimpaDbUtil import wakeUp
# for i in range(30):
# machineName = 'simpatix%d' % (i+10)
# print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName))
wakeUp('simpatix21')
# print putToSleep('simpatix13')
# print isNonRespondingMachineSleeping('simpatix13')

View File

@ -20,7 +20,7 @@ class CoclutoTestCase(unittest.TestCase):
qstat_output = file.read()
# qstatParser = ClusterController.QstatParser()
qstatParser = QstatParser()
job_state = qstatParser.parseQstatOutput(qstat_output, cluster_domain='ipr.univ-rennes1.fr')
job_state = qstatParser.parse_qstat_output(qstat_output, cluster_domain='ipr.univ-rennes1.fr')
self.assertIsInstance(job_state, JobsState)