From 9f4a80b11eae66ac342a1dc10852e2bb1b9c5c00 Mon Sep 17 00:00:00 2001 From: Guillaume Raffy Date: Fri, 14 Jun 2024 15:52:32 +0200 Subject: [PATCH] fixed styling errors and added more type hinting to increase maintainability of cocluto - all styling errors are now fixed, but there are still warnings and information - most functions have been converted to snake case - most functions now have type hinting work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3873] --- .../ClusterController/ClusterController.py | 267 +++++++++-------- cocluto/ClusterController/ClusterNode.py | 191 ++++++------ .../ClusterNodeStatusUpdater.py | 206 +++++++------ cocluto/ClusterController/ClusterStatus.py | 280 +++++++++--------- cocluto/ClusterController/Install.py | 2 +- cocluto/ClusterController/Job.py | 13 +- cocluto/ClusterController/JobsState.py | 82 ++--- cocluto/ClusterController/JobsStateUpdater.py | 56 ++-- cocluto/ClusterController/Log.py | 8 +- cocluto/ClusterController/PowerState.py | 5 +- cocluto/ClusterController/QstatParser.py | 139 +++++---- cocluto/ClusterController/QueueMachine.py | 103 ++++--- cocluto/ClusterController/SlotAllocator.py | 130 ++++---- cocluto/ClusterController/SunGridEngine.py | 58 ++-- cocluto/ClusterController/TestBug00000003.py | 45 ++- cocluto/ClusterController/Util.py | 128 ++++---- cocluto/ClusterController/WebServer.py | 110 +++---- cocluto/Ipmi/ClusterNodeSensorsReadings.py | 24 +- cocluto/Ipmi/IpmiTool202Parser.py | 4 +- cocluto/Ipmi/IpmiTool218Parser.py | 4 +- cocluto/Ipmi/Sensor.py | 29 +- cocluto/SimpaDbUtil.py | 50 ++-- cocluto/Util.py | 52 ++-- test/test_cocluto.py | 2 +- 24 files changed, 1037 insertions(+), 951 deletions(-) diff --git a/cocluto/ClusterController/ClusterController.py b/cocluto/ClusterController/ClusterController.py index fb7a4d2..5ad7eb4 100644 --- a/cocluto/ClusterController/ClusterController.py +++ b/cocluto/ClusterController/ClusterController.py @@ -1,21 +1,26 @@ #!/usr/bin/env python +from typing import Dict, Optional import sys -sys.path.insert(0, '..') import os import MySQLdb import threading -from Lib.Util import * -from Lib.SimpaDbUtil import * import time -from ClusterStatus import ClusterStatus -from SlotAllocator import DecoupledSlotAllocator -from Log import logDebug, logInfo -from ClusterNodeStatusUpdater import IWakeUpCompleteNotifier, ISleepCompleteNotifier -from SunGridEngine import SunGridEngine -from Util import log, onException -from WebServer import WebServerThread -from PowerState import PowerState -from HTMLParser import HTMLParser +from datetime import datetime +if sys.version_info < (3, 0): + from HTMLParser import HTMLParser +else: + from html.parser import HTMLParser +from ..Util import log +# from ..SimpaDbUtil import toto +from .ClusterNode import ClusterNodeId, ClusterNode +from .ClusterStatus import ClusterStatus +from .SlotAllocator import DecoupledSlotAllocator, SlotAllocator +from .Log import logDebug, log_info +from .ClusterNodeStatusUpdater import IWakeUpCompleteNotifier, ISleepCompleteNotifier +from .SunGridEngine import SunGridEngine +from .Util import on_exception +from .WebServer import WebServerThread +from .PowerState import PowerState VERSION = '1.18' @@ -23,38 +28,38 @@ VERSION = '1.18' class MyHTMLParser(HTMLParser): def __init__(self): HTMLParser.__init__(self) - self.TokenList = [] + self.token_list = [] def handle_data(self, data): data = data.strip() if data and len(data) > 0: - self.TokenList.append(data) + self.token_list.append(data) # print data - def GetTokenList(self): - return self.TokenList + def get_token_list(self): + return self.token_list class WakeUpCompleteNotifier(IWakeUpCompleteNotifier): def __init__(self, machineName, clusterController): - self.m_machineName = machineName - self.m_clusterController = clusterController + self.machine_name = machineName + self.cluster_controller = clusterController - def onWakeUpComplete(self): - logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start') - self.m_clusterController.onMachineWakeUpComplete(self.m_machineName) + def on_wake_up_complete(self): + logDebug('WakeUpCompleteNotifier::on_wake_up_complete : start') + self.cluster_controller.onMachineWakeUpComplete(self.machine_name) class SleepCompleteNotifier(ISleepCompleteNotifier): def __init__(self, machineName, clusterController): - self.m_machineName = machineName - self.m_clusterController = clusterController + self.machine_name = machineName + self.cluster_controller = clusterController - def onSleepComplete(self, bSleepSucceeded): - logDebug('SleepCompleteNotifier::onSleepComplete : start') - self.m_clusterController.onMachineSleepComplete(self.m_machineName, bSleepSucceeded) + def on_sleep_complete(self, bSleepSucceeded): + logDebug('SleepCompleteNotifier::on_sleep_complete : start') + self.cluster_controller.onMachineSleepComplete(self.machine_name, bSleepSucceeded) def jouleToKwh(fEnergyInJoules): @@ -79,23 +84,36 @@ class ClusterController: jobs (eg add some machines to a queue). Mechanism to let user get priority """ + cluster_status: ClusterStatus + slot_allocator = SlotAllocator + machines_that_need_wake_up: Dict[ClusterNodeId, ClusterNode] + machines_that_need_wake_up_lock: threading.Lock # to prevent concurrent access to machines_that_need_wake_up + machines_that_need_sleeping: Dict[ClusterNodeId, ClusterNode] + machines_that_need_sleeping_lock: threading.Lock # to prevent concurrent access to machines_that_need_sleeping + last_energy_status_log_time: Optional[datetime] + DELAY_BETWEEN_ENERGY_STATUS_LOGS: int # in seconds + session_id: Optional[int] # session (run) identifier in database + web_server: WebServerThread + stop: bool + stop_lock: threading.Lock # to prevent concurrent access to stop + def __init__(self): gridEngine = SunGridEngine() - self.m_clusterStatus = ClusterStatus(gridEngine) - self.m_slotAllocator = DecoupledSlotAllocator() # SimpleSlotAllocator() - self.m_machinesThatNeedWakeUp = {} - self.m_machinesThatNeedWakeupLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedWakeUp - self.m_machinesThatNeedSleeping = {} - self.m_machinesThatNeedSleepingLock = threading.Lock() # to prevent concurrent access to m_machinesThatNeedSleeping - self.m_lastEnergyStatusLogTime = None - self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds - self.m_iSessionId = None # session (run) identifier in database - self.m_webServer = WebServerThread(self) - self.m_bStop = False - self.m_bStopLock = threading.Lock() # to prevent concurrent access to m_bStop + self.cluster_status = ClusterStatus(gridEngine) + self.slot_allocator = DecoupledSlotAllocator() # SimpleSlotAllocator() pylint: disable=no-value-for-parameter + self.machines_that_need_wake_up = {} + self.machines_that_need_wake_up_lock = threading.Lock() + self.machines_that_need_sleeping = {} + self.machines_that_need_sleeping_lock = threading.Lock() + self.last_energy_status_log_time = None + self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 + self.session_id = None + self.web_server = WebServerThread(self) + self.stop = False + self.stop_lock = threading.Lock() def getClusterStatus(self): - return self.m_clusterStatus + return self.cluster_status def log(self, message): print(message) @@ -104,79 +122,76 @@ class ClusterController: self.log("ClusterController::shutdownLeastImportantNode : start") def onMachineWakeUpComplete(self, machineName): - self.m_machinesThatNeedWakeupLock.acquire() - # logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp))) - del self.m_machinesThatNeedWakeUp[machineName] - # logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp))) - self.m_machinesThatNeedWakeupLock.release() + self.machines_that_need_wake_up_lock.acquire() + # logDebug('ClusterController::onMachineWakeUpComplete : machine %s old len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up))) + del self.machines_that_need_wake_up[machineName] + # logDebug('ClusterController::onMachineWakeUpComplete : machine %s new len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up))) + self.machines_that_need_wake_up_lock.release() logDebug('ClusterController::onMachineWakeUpComplete : removed %s from the list of machines that need waking up because it\'s now awake' % machineName) def onMachineSleepComplete(self, machineName, bSleepSucceeded): - self.m_machinesThatNeedSleepingLock.acquire() - # logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp))) - del self.m_machinesThatNeedSleeping[machineName] - # logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.m_machinesThatNeedWakeUp) = %d' % (machineName,len(self.m_machinesThatNeedWakeUp))) - self.m_machinesThatNeedSleepingLock.release() + self.machines_that_need_sleeping_lock.acquire() + # logDebug('ClusterController::onMachineSleepComplete : machine %s old len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up))) + del self.machines_that_need_sleeping[machineName] + # logDebug('ClusterController::onMachineSleepComplete : machine %s new len(self.machines_that_need_wake_up) = %d' % (machineName,len(self.machines_that_need_wake_up))) + self.machines_that_need_sleeping_lock.release() if bSleepSucceeded: logDebug('ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it\'s now sleeping' % machineName) else: logDebug('ClusterController::onMachineSleepComplete : removed %s from the list of machines that need sleeping because it can\'t be put to sleep at the moment (eg a job just arrived)' % machineName) def getNumPendingWakeUps(self): - self.m_machinesThatNeedWakeupLock.acquire() - numPendingWakeUps = len(self.m_machinesThatNeedWakeUp) - self.m_machinesThatNeedWakeupLock.release() + self.machines_that_need_wake_up_lock.acquire() + numPendingWakeUps = len(self.machines_that_need_wake_up) + self.machines_that_need_wake_up_lock.release() return numPendingWakeUps def getNumPendingSleeps(self): - self.m_machinesThatNeedSleepingLock.acquire() - numPendingSleeps = len(self.m_machinesThatNeedSleeping) - self.m_machinesThatNeedSleepingLock.release() + self.machines_that_need_sleeping_lock.acquire() + numPendingSleeps = len(self.machines_that_need_sleeping) + self.machines_that_need_sleeping_lock.release() return numPendingSleeps def putIdleMachinesToSleep(self): - self.m_clusterStatus.m_lock.acquire() - idleMachines = self.m_clusterStatus.getIdleMachines() - # logInfo('idleMachines :') - self.m_machinesThatNeedToSleep = [] - for machineName, idleMachine in idleMachines.items(): - if idleMachine.getPowerState() == PowerState.ON: - # logInfo('\t%s' % machineName) - if idleMachine.getName() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things - self.m_machinesThatNeedSleeping[idleMachine.getName()] = idleMachine - self.m_clusterStatus.m_lock.release() + self.cluster_status.lock.acquire() + idleMachines = self.cluster_status.get_idle_machines() + # log_info('idleMachines :') + for _machineName, idleMachine in idleMachines.items(): + if idleMachine.get_power_state() == PowerState.ON: + # log_info('\t%s' % machineName) + if idleMachine.get_name() != 'simpatix10': # never put simpatix10 to sleep because it's the sge master and is also server for other things + self.machines_that_need_sleeping[idleMachine.get_name()] = idleMachine + self.cluster_status.lock.release() - listOfMachinesThatNeedSleeping = self.m_machinesThatNeedSleeping.values() # duplicate the list so that we don't iterate on m_machinesThatNeedSleeping, which could cause a runtime error because callbacks alter m_machinesThatNeedWakeUp + listOfMachinesThatNeedSleeping = self.machines_that_need_sleeping.values() # duplicate the list so that we don't iterate on machines_that_need_sleeping, which could cause a runtime error because callbacks alter machines_that_need_wake_up for machine in listOfMachinesThatNeedSleeping: - logInfo('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.getName()) - machine.requestSleep(SleepCompleteNotifier(machine.getName(), self)) + log_info('ClusterController::putIdleMachinesToSleep : requesting sleep for %s because it\'s idle' % machine.get_name()) + machine.request_sleep(SleepCompleteNotifier(machine.get_name(), self)) # pylint: disable=no-value-for-parameter if len(listOfMachinesThatNeedSleeping) != 0: # hack : wait until the sleep requests are handled so that we don't request the same machine to sleep multiple times while self.getNumPendingSleeps() > 0: time.sleep(1) - def wakeUpMachinesForPendingJobs(self): + def wake_up_machinesForPendingJobs(self): listOfMachinesThatNeedWakeUp = [] - self.m_clusterStatus.m_lock.acquire() - pendingJobs = self.m_clusterStatus.getPendingJobs() - """ - logInfo('pending jobs :') - for job in pendingJobs.values(): - logInfo('\t%d' % job.getId().asStr()) - """ + self.cluster_status.lock.acquire() + pendingJobs = self.cluster_status.get_pending_jobs() + # log_info('pending jobs :') + # for job in pendingJobs.values(): + # log_info('\t%d' % job.getId().asStr()) if len(pendingJobs) != 0: - self.m_machinesThatNeedWakeUp = self.m_slotAllocator.getMachinesThatNeedWakeUp(pendingJobs, self.m_clusterStatus) - if len(self.m_machinesThatNeedWakeUp) == 0: - None - # logInfo('ClusterController::updateNormalState : no machine needs waking up') + self.machines_that_need_wake_up = self.slot_allocator.get_machinesThatNeedWakeUp(pendingJobs, self.cluster_status) + if len(self.machines_that_need_wake_up) == 0: + pass + # log_info('ClusterController::updateNormalState : no machine needs waking up') else: - listOfMachinesThatNeedWakeUp = self.m_machinesThatNeedWakeUp.values() # duplicate the list so that we don't iterate on m_machinesThatNeedWakeUp, which would cause a runtime error because callbacks alter m_machinesThatNeedWakeUp + listOfMachinesThatNeedWakeUp = self.machines_that_need_wake_up.values() # duplicate the list so that we don't iterate on machines_that_need_wake_up, which would cause a runtime error because callbacks alter machines_that_need_wake_up for machine in listOfMachinesThatNeedWakeUp: - logInfo('ClusterController::wakeUpMachinesForPendingJobs : requesting wake up for ' + machine.getName()) - machine.requestWakeUp(WakeUpCompleteNotifier(machine.getName(), self)) - self.m_clusterStatus.m_lock.release() + log_info('ClusterController::wake_up_machinesForPendingJobs : requesting wake up for ' + machine.get_name()) + machine.request_wake_up(WakeUpCompleteNotifier(machine.get_name(), self)) # pylint: disable=no-value-for-parameter + self.cluster_status.lock.release() if len(listOfMachinesThatNeedWakeUp) != 0: # hack : wait until the wakeup requests are handled so that a later sleep request doesn't cancel it @@ -184,16 +199,16 @@ class ClusterController: while self.getNumPendingWakeUps() > 0: time.sleep(1) iSGE_CHEK_RUNNABLE_JOBS_DELAY = 60 * 5 # max time it takes for sge between the fact that a queued job is runnable and SGE actually starting it (I've put a long time here because sometimes, qstat takes a long time to ralise that the machine is available after I wake it up) - logInfo('ClusterController::wakeUpMachinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY) + log_info('ClusterController::wake_up_machinesForPendingJobs : all required machines are awake. Now give %d seconds to SGE to allocate slots.' % iSGE_CHEK_RUNNABLE_JOBS_DELAY) # wait until SGE has a chance to allocate slots time.sleep(iSGE_CHEK_RUNNABLE_JOBS_DELAY) # note : this is annoying because it blocks the main thread. This could be improved if we forbid the machines to go to sleep for that much time.... - logInfo('ClusterController::wakeUpMachinesForPendingJobs : end of the delay given to SGE to allocate slots') + log_info('ClusterController::wake_up_machinesForPendingJobs : end of the delay given to SGE to allocate slots') def updateNormalState(self): # attempt to shut down machines that are idle self.putIdleMachinesToSleep() # wake up necessary machines if there are pending jobs - self.wakeUpMachinesForPendingJobs() + self.wake_up_machinesForPendingJobs() def storeSessionInDatabase(self): conn = MySQLdb.connect('simpatix10', 'clusterctrl', '', 'clustercontroller') @@ -207,7 +222,7 @@ class ClusterController: iSessionId = r.fetch_row()[0][0] # stores information about the session - sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.m_clusterStatus.m_clusterNodes)) + sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.cluster_status.cluster_nodes)) print(sqlCommand) conn.query(sqlCommand) @@ -225,64 +240,62 @@ class ClusterController: assert conn # update energy savings for the current session - sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % (jouleToKwh(self.m_clusterStatus.getEnergySavings()), self.m_iSessionId) + sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % (jouleToKwh(self.cluster_status.get_energy_savings()), self.session_id) print(sqlCommand) conn.query(sqlCommand) # update the end time of the current session - sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.m_iSessionId) + sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.session_id) print(sqlCommand) conn.query(sqlCommand) conn.close() - def setControlOnMachine(self, machineName, bControl): + def set_control_on_machine(self, machineName, bControl): """ adds or removes the control of ClusterController on the given machine """ - self.m_clusterStatus.setControlOnMachine(machineName, bControl) + self.cluster_status.set_control_on_machine(machineName, bControl) def run(self): """ """ - self.m_iSessionId = self.storeSessionInDatabase() + self.session_id = self.storeSessionInDatabase() log("storeSessionInDatabase completed") DELAY_BETWEEN_MEASURES = 10 # in seconds - self.m_clusterStatus.startReadingThreads() - self.m_webServer.start() - while not self.m_clusterStatus.isReady(): + self.cluster_status.start_reading_threads() + self.web_server.start() + while not self.cluster_status.is_ready(): log('waiting for system to be ready') time.sleep(1) - None - logInfo('ClusterController::run : cluster initial readings have completed') + log_info('ClusterController::run : cluster initial readings have completed') startTime = time.localtime() - while not self.m_bStop: + while not self.stop: currentTime = time.time() - # clusterStatus.m_nodesStatus['simpatix10'].dump() - if (not self.m_lastEnergyStatusLogTime) or (currentTime > (self.m_lastEnergyStatusLogTime + self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)): - iNumMachines = len(self.m_clusterStatus.m_clusterNodes) + if (not self.last_energy_status_log_time) or (currentTime > (self.last_energy_status_log_time + self.DELAY_BETWEEN_ENERGY_STATUS_LOGS)): + iNumMachines = len(self.cluster_status.cluster_nodes) iNumMachinesOn = 0 iNumSleepingMachines = 0 - for machine in self.m_clusterStatus.m_clusterNodes.values(): - ePowerState = machine.getPowerState() + for machine in self.cluster_status.cluster_nodes.values(): + ePowerState = machine.get_power_state() if ePowerState == PowerState.ON: iNumMachinesOn += 1 elif ePowerState == PowerState.SLEEP: iNumSleepingMachines += 1 - logInfo('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines)) - iNumSlots = self.m_clusterStatus.getNumControlledSlots() - iNumUsedSlots = self.m_clusterStatus.getNumUsedSlots() - iNumWastedSlots = self.m_clusterStatus.getNumWastedSlots() - iNumSleepingSlots = self.m_clusterStatus.getNumSleepingSlots() - logInfo('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots)) - logInfo('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.m_clusterStatus.getCurrentPowerConsumption(), self.m_clusterStatus.getCurrentPowerSavings())) - logInfo('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.m_clusterStatus.getEnergyConsumption()), jouleToKwh(self.m_clusterStatus.getEnergySavings()))) + log_info('%d machines (%d ON, %d SLEEPING)' % (iNumMachines, iNumMachinesOn, iNumSleepingMachines)) + iNumSlots = self.cluster_status.get_num_controlled_slots() + iNumUsedSlots = self.cluster_status.get_num_used_slots() + iNumWastedSlots = self.cluster_status.get_num_wasted_slots() + iNumSleepingSlots = self.cluster_status.get_num_sleeping_slots() + log_info('%d slots (%d used, %d wasted, %d sleeping)' % (iNumSlots, iNumUsedSlots, iNumWastedSlots, iNumSleepingSlots)) + log_info('cluster estimated power consumption : %f W (saving from cluster controller : %f W)' % (self.cluster_status.get_current_power_consumption(), self.cluster_status.get_current_power_savings())) + log_info('cluster estimated energy consumption since %s : %f kWh (saving from cluster controller : %f kWh)' % (time.asctime(startTime), jouleToKwh(self.cluster_status.get_energy_consumption()), jouleToKwh(self.cluster_status.get_energy_savings()))) self.updateSessionEnergyConsumptionInDatabase() - self.m_lastEnergyStatusLogTime = currentTime + self.last_energy_status_log_time = currentTime self.updateNormalState() time.sleep(DELAY_BETWEEN_MEASURES) - self.m_clusterStatus.stopReadingThreads() + self.cluster_status.stop_reading_threads() def storeClusterNodeStatus(clusterNodeStatus): @@ -290,19 +303,17 @@ def storeClusterNodeStatus(clusterNodeStatus): conn = MySQLdb.connect('simpatix10', 'root', '', 'simpa_measurements') assert conn # conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""") - ''' - conn.query("""SELECT * FROM fan_rpm_logs""") - r=conn.store_result() - print r.fetch_row()[0] - ''' - for key, sensor in clusterNodeStatus.m_sensors.items(): - sensorId = clusterNodeStatus.m_clusterNodeName + '_' + sensor.m_name + # conn.query("""SELECT * FROM fan_rpm_logs""") + # r=conn.store_result() + # print r.fetch_row()[0] + for _key, sensor in clusterNodeStatus.sensors.items(): + sensorId = clusterNodeStatus.cluster_node_name + '_' + sensor.name if sensor.typeName() == 'Fan': - sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.m_rpms) + """, NOW());""" + sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.rpms) + """, NOW());""" print(sqlCommand) conn.query(sqlCommand) elif sensor.typeName() == 'Temperature': - sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.m_temperature) + """, NOW());""" + sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.temperature) + """, NOW());""" print(sqlCommand) conn.query(sqlCommand) else: @@ -311,11 +322,11 @@ def storeClusterNodeStatus(clusterNodeStatus): if __name__ == '__main__': - # Lib.Util.sendTextMail('SimpaCluster ', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content') + # Lib.Util.send_text_mail('SimpaCluster ', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content') try: - logInfo('ClusterController v. %s starting....' % VERSION) - # executeCommand('ping -o -t 1 simpatix310 > /dev/null') - # print executeCommand('ssh simpatix10 "ipmitool sensor"') + log_info('ClusterController v. %s starting....' % VERSION) + # execute_command('ping -o -t 1 simpatix310 > /dev/null') + # print execute_command('ssh simpatix10 "ipmitool sensor"') # assert False, 'prout' controller = ClusterController() controller.run() @@ -323,4 +334,4 @@ if __name__ == '__main__': # except AssertionError, error: # except KeyboardInterrupt, error: except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) - onException(exception) + on_exception(exception) diff --git a/cocluto/ClusterController/ClusterNode.py b/cocluto/ClusterController/ClusterNode.py index a32cc6a..2a90a22 100644 --- a/cocluto/ClusterController/ClusterNode.py +++ b/cocluto/ClusterController/ClusterNode.py @@ -1,142 +1,161 @@ -import threading -from PowerState import PowerState, PowerStateToStr -from ClusterNodeStatusUpdater import ClusterNodeStatusUpdater -import Lib.Util -import Lib.SimpaDbUtil -from Log import logInfo, logWarning +from typing import TYPE_CHECKING +from typing import Optional +from .PowerState import PowerState, PowerStateToStr +from .Log import log_info, log_warning +from .ClusterNodeStatusUpdater import ClusterNodeStatusUpdater +if TYPE_CHECKING: + from .ClusterStatus import ClusterStatus + from .SunGridEngine import SunGridEngine + from .ClusterController import SleepCompleteNotifier from datetime import datetime +ClusterNodeId = str # eg 'physix99' + class ClusterNode: """ the state of a machine node """ - def __init__(self, machineName, cluster, gridEngine): - self.m_name = machineName - self.m_cluster = cluster # the cluster this machine belongs to - self.m_requestedPowerState = PowerState.ON - self.m_powerState = PowerState.UNKNOWN - self.m_lastPowerStateTime = None # time at which the last value of self.m_powerState has been set - self.m_machineStatusUpdater = ClusterNodeStatusUpdater(machineName, self, gridEngine) - self.m_energyConsumption = 0.0 # estimate of the energy consumption of this machine since the start of cluster controller (in joules) - self.m_energySavings = 0.0 # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules) + name: ClusterNodeId + cluster: 'ClusterStatus' # the cluster this machine belongs to + requested_power_state: PowerState + power_state: PowerState + last_power_state_time: Optional[datetime] # time at which the last value of self.power_state has been set + machine_status_updater: ClusterNodeStatusUpdater + energy_consumption: float # estimate of the energy consumption of this machine since the start of cluster controller (in joules) + energy_savings: float # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules) - def getName(self): - return self.m_name + def __init__(self, machine_name: ClusterNodeId, cluster: 'ClusterStatus', grid_engine: 'SunGridEngine'): + self.name = machine_name + self.cluster = cluster # the cluster this machine belongs to + self.requested_power_state = PowerState.ON + self.power_state = PowerState.UNKNOWN + self.last_power_state_time = None # time at which the last value of self.power_state has been set + self.machine_status_updater = ClusterNodeStatusUpdater(machine_name, self, grid_engine) + self.energy_consumption = 0.0 # estimate of the energy consumption of this machine since the start of cluster controller (in joules) + self.energy_savings = 0.0 # estimate of the energy savings on this machine caused by the cluster controller since it started (in joules) - def isReady(self): - if self.m_powerState == PowerState.UNKNOWN: - # logInfo(self.m_name + ' is not ready (waiting for power state)') + def get_name(self) -> ClusterNodeId: + return self.name + + def is_ready(self) -> bool: + if self.power_state == PowerState.UNKNOWN: + # log_info(self.name + ' is not ready (waiting for power state)') return False - if self.m_powerState == PowerState.ON: + if self.power_state == PowerState.ON: return True - # log(self.m_name + ' is ready') + # log(self.name + ' is ready') return True - def getPowerState(self): - return self.m_powerState + def get_power_state(self) -> PowerState: + return self.power_state - def setShouldAlwaysBeOn(self): - self.m_machineStatusUpdater.setShouldAlwaysBeOn() - self.setPowerState(PowerState.ON) + def set_should_always_be_on(self): + self.machine_status_updater.set_should_always_be_on() + self.set_power_state(PowerState.ON) - def setPowerState(self, powerState): + def set_power_state(self, power_state: PowerState): bUpdateRequiredChecks = False - if self.m_powerState == PowerState.UNKNOWN: - logInfo('ClusterNode::setPowerState : ' + self.m_name + '\'s power state has been initialized to ' + PowerStateToStr(powerState)) - self.m_powerState = powerState - self.m_lastPowerStateTime = datetime.now() + if self.power_state == PowerState.UNKNOWN: + log_info('ClusterNode::set_power_state : ' + self.name + '\'s power state has been initialized to ' + PowerStateToStr(power_state)) + self.power_state = power_state + self.last_power_state_time = datetime.now() bUpdateRequiredChecks = True else: # update the estimation of energy consumption - self.updateEnergyMeasurements() + self.update_energy_measurements() # then change the power state - if self.m_powerState != powerState: - logInfo('ClusterNode::setPowerState : ' + self.m_name + '\'s power state has been changed to ' + PowerStateToStr(powerState)) - self.m_powerState = powerState - self.m_lastPowerStateTime = datetime.now() + if self.power_state != power_state: + log_info('ClusterNode::set_power_state : ' + self.name + '\'s power state has been changed to ' + PowerStateToStr(power_state)) + self.power_state = power_state + self.last_power_state_time = datetime.now() bUpdateRequiredChecks = True if bUpdateRequiredChecks: - if self.m_powerState == PowerState.ON: - self.m_machineStatusUpdater.m_bCheckPowerState = True - self.m_machineStatusUpdater.m_bCheckSensors = True - elif self.m_powerState == PowerState.OFF: - self.m_machineStatusUpdater.m_bCheckPowerState = True - self.m_machineStatusUpdater.m_bCheckSensors = False - elif self.m_powerState == PowerState.SLEEP: - self.m_machineStatusUpdater.m_bCheckPowerState = True - self.m_machineStatusUpdater.m_bCheckSensors = False - elif self.m_powerState == PowerState.UNPLUGGED: - self.m_machineStatusUpdater.m_bCheckPowerState = True - self.m_machineStatusUpdater.m_bCheckSensors = False + if self.power_state == PowerState.ON: + self.machine_status_updater.check_power_state = True + self.machine_status_updater.check_sensors = True + elif self.power_state == PowerState.OFF: + self.machine_status_updater.check_power_state = True + self.machine_status_updater.check_sensors = False + elif self.power_state == PowerState.SLEEP: + self.machine_status_updater.check_power_state = True + self.machine_status_updater.check_sensors = False + elif self.power_state == PowerState.UNPLUGGED: + self.machine_status_updater.check_power_state = True + self.machine_status_updater.check_sensors = False else: assert False - def onNewPowerStateReading(self, powerState): + def on_new_power_state_reading(self, power_state: PowerState): """ called when a new powerstate reading arrives """ - if powerState != self.getPowerState(): - if self.getPowerState() != PowerState.UNKNOWN: - logWarning('ClusterNode::onNewPowerStateReading : ' + self.m_name + '\'s power state has been (manually it seems) changed to ' + PowerStateToStr(powerState)) - self.setPowerState(powerState) + if power_state != self.get_power_state(): + if self.get_power_state() != PowerState.UNKNOWN: + log_warning('ClusterNode::on_new_power_state_reading : ' + self.name + '\'s power state has been (manually it seems) changed to ' + PowerStateToStr(power_state)) + self.set_power_state(power_state) - def getPowerConsumptionForPowerState(self, ePowerState): + def get_power_consumption_for_power_state(self, power_state: PowerState) -> float: """ returns the power consumption estimation (in watts) of this machine for the given power state """ fCurrentIntensity = 0.0 fCurrentVoltage = 220.0 # noticed on 26.08.2009 that putting 22 machines from sleep to on eats 17 A, resulting in difference of 0.77 A per machine - if ePowerState == PowerState.ON: + if power_state == PowerState.ON: fCurrentIntensity = 0.9 # value when the machine is doing nothing - elif ePowerState == PowerState.OFF: + elif power_state == PowerState.OFF: fCurrentIntensity = 0.1 - elif ePowerState == PowerState.SLEEP: + elif power_state == PowerState.SLEEP: fCurrentIntensity = 0.1 - elif ePowerState == PowerState.UNPLUGGED: + elif power_state == PowerState.UNPLUGGED: fCurrentIntensity = 0.0 else: assert False return fCurrentIntensity * fCurrentVoltage - def updateEnergyMeasurements(self): - timeInterval = datetime.now() - self.m_lastPowerStateTime - self.m_energyConsumption += self.getPowerConsumptionForPowerState(self.m_powerState) * timeInterval.seconds - self.m_energySavings += (self.getPowerConsumptionForPowerState(PowerState.ON) - self.getPowerConsumptionForPowerState(self.m_powerState)) * timeInterval.seconds - self.m_lastPowerStateTime = datetime.now() - # logDebug('energy savings on %s : %f J' %(self.getName(), self.m_energySavings)) + def update_energy_measurements(self): + timeInterval = datetime.now() - self.last_power_state_time + self.energy_consumption += self.get_power_consumption_for_power_state(self.power_state) * timeInterval.seconds + self.energy_savings += (self.get_power_consumption_for_power_state(PowerState.ON) - self.get_power_consumption_for_power_state(self.power_state)) * timeInterval.seconds + self.last_power_state_time = datetime.now() + # logDebug('energy savings on %s : %f J' %(self.get_name(), self.energy_savings)) - def getEnergyConsumption(self): + def get_energy_consumption(self) -> float: """ in joules """ - self.updateEnergyMeasurements() - return self.m_energyConsumption + self.update_energy_measurements() + return self.energy_consumption - def getPowerConsumption(self): - fCurrentPowerConsumption = self.getPowerConsumptionForPowerState(self.m_powerState) - # logDebug('getPowerConsumption of %s : %f (powerstate = %d)' % (self.getName(), fCurrentPowerConsumption, self.m_powerState)) + def get_power_consumption(self) -> float: + fCurrentPowerConsumption = self.get_power_consumption_for_power_state(self.power_state) + # logDebug('get_power_consumption of %s : %f (powerstate = %d)' % (self.get_name(), fCurrentPowerConsumption, self.power_state)) return fCurrentPowerConsumption - def getEnergySavings(self): - self.updateEnergyMeasurements() - return self.m_energySavings + def get_energy_savings(self) -> float: + self.update_energy_measurements() + return self.energy_savings - def onSleepFailedBecauseAJobJustArrived(self): - logInfo('%s was scheduled to sleep but the sleep is canceled because it\'s currently executing a new job' % self.m_name) + def on_sleep_because_a_job_just_arrived(self): + log_info('%s was scheduled to sleep but the sleep is canceled because it\'s currently executing a new job' % self.name) - def requestSleep(self, sleepCompleteNotifier=None): - self.m_machineStatusUpdater.requestSleep(sleepCompleteNotifier) + def request_sleep(self, sleep_complete_notifier: Optional['SleepCompleteNotifier'] = None): + self.machine_status_updater.request_sleep(sleep_complete_notifier) - def requestWakeUp(self, wakeUpCompleteNotifier=None): - self.m_machineStatusUpdater.requestWakeUp(wakeUpCompleteNotifier) + def request_wake_up(self, wake_up_complete_notifier: Optional['SleepCompleteNotifier'] = None): + self.machine_status_updater.request_wake_up(wake_up_complete_notifier) - def getQueueMachineName(self): - return self.getCluster().getJobsState().getQueueMachine(self.m_name).getName() - assert self.m_queueName is not None - return self.m_queueName + def get_queue_machine_name(self) -> ClusterNodeId: + return self.get_cluster().get_jobs_state().get_queue_machine(self.name).get_name() + # assert self.queue_name is not None + # return self.queue_name - def getCluster(self): - return self.m_cluster + def get_cluster(self) -> 'ClusterStatus': + return self.cluster + + +# from .ClusterStatus import ClusterStatus # noqa: E402, pylint: disable=wrong-import-position +# from .SunGridEngine import SunGridEngine # noqa: E402, pylint: disable=wrong-import-position +# from .ClusterController import SleepCompleteNotifier # noqa: E402, pylint: disable=wrong-import-position diff --git a/cocluto/ClusterController/ClusterNodeStatusUpdater.py b/cocluto/ClusterController/ClusterNodeStatusUpdater.py index a218758..f50d17f 100644 --- a/cocluto/ClusterController/ClusterNodeStatusUpdater.py +++ b/cocluto/ClusterController/ClusterNodeStatusUpdater.py @@ -1,39 +1,46 @@ +from typing import TYPE_CHECKING +from typing import Optional, List import threading import time -import Lib.Util -import Lib.SimpaDbUtil -from PowerState import PowerState -from Log import logInfo, logDebug -from Util import blockingWakeUpMachine, blockingPutMachineToSleep, getPowerState, onException +import abc +from .PowerState import PowerState +from .Log import log_info, logDebug +from .Util import blocking_wake_up_machine, blocking_put_machine_to_sleep, get_power_state, on_exception +if TYPE_CHECKING: + from .ClusterNode import ClusterNodeId, ClusterNode + from .SunGridEngine import SunGridEngine -class IWakeUpCompleteNotifier: +class IWakeUpCompleteNotifier(abc.ABCMeta): """ interface for wakeup notifiers """ - def onWakeUpComplete(self): + @abc.abstractmethod + def on_wake_up_complete(self): assert False -class ISleepCompleteNotifier: +class ISleepCompleteNotifier(abc.ABCMeta): """ interface for sleep notifiers """ - def onSleepComplete(self, bSleepSucceeded): + @abc.abstractmethod + def on_sleep_complete(self, bSleepSucceeded): assert False -class IRequest: +class IRequest(abc.ABCMeta): GO_TO_SLEEP = 1 WAKE_UP = 2 CHECK_POWER_STATE = 3 def __init__(self, requestType): - self.m_type = requestType + self.type = requestType def getType(self): - return self.m_type + return self.type + @abc.abstractmethod def process(self, clusterNodeStatusUpdater): """ processes this request @@ -43,58 +50,58 @@ class IRequest: class WakeUpRequest(IRequest): - def __init__(self, wakeUpNotifier): + def __init__(self, wakeUpNotifier: IWakeUpCompleteNotifier): IRequest.__init__(self, IRequest.WAKE_UP) - self.m_wakeUpNotifier = wakeUpNotifier + self.wake_up_notifier = wakeUpNotifier def process(self, clusterNodeStatusUpdater): - assert clusterNodeStatusUpdater.m_bShouldAlwaysBeOn is False # are we attempting to wake up a machine that should always be on ? - logInfo('Handling wakeup request for %s' % clusterNodeStatusUpdater.getName()) - bSuccess = blockingWakeUpMachine(clusterNodeStatusUpdater.getName()) + assert clusterNodeStatusUpdater.should_always_be_on is False # are we attempting to wake up a machine that should always be on ? + log_info('Handling wakeup request for %s' % clusterNodeStatusUpdater.get_name()) + bSuccess = blocking_wake_up_machine(clusterNodeStatusUpdater.get_name()) assert bSuccess # activate the associated machine queue - if clusterNodeStatusUpdater.setQueueActivation(True): + if clusterNodeStatusUpdater.set_queue_activation(True): pass # all is ok else: assert False - clusterNodeStatusUpdater.m_stateLock.acquire() - clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.ON) - clusterNodeStatusUpdater.m_stateLock.release() - if self.m_wakeUpNotifier: + clusterNodeStatusUpdater.state_lock.acquire() + clusterNodeStatusUpdater.cluster_node.set_power_state(PowerState.ON) + clusterNodeStatusUpdater.state_lock.release() + if self.wake_up_notifier: logDebug('ClusterNodeStatusUpdater::run : Sending wakeup notification') - self.m_wakeUpNotifier.onWakeUpComplete() + self.wake_up_notifier.on_wake_up_complete() class SleepRequest(IRequest): - def __init__(self, sleepCompleteNotifier): + def __init__(self, sleepCompleteNotifier: ISleepCompleteNotifier): IRequest.__init__(self, IRequest.GO_TO_SLEEP) - self.m_sleepCompleteNotifier = sleepCompleteNotifier + self.sleep_complete_notifier = sleepCompleteNotifier def process(self, clusterNodeStatusUpdater): - assert not clusterNodeStatusUpdater.m_bShouldAlwaysBeOn # are we attempting to put a machine the should stay on to sleep ? - logInfo('Handling sleep request for %s' % clusterNodeStatusUpdater.getName()) - if clusterNodeStatusUpdater.setQueueActivation(False): - if clusterNodeStatusUpdater.queueIsEmpty(): - if blockingPutMachineToSleep(clusterNodeStatusUpdater.m_clusterNodeName): + assert not clusterNodeStatusUpdater.should_always_be_on # are we attempting to put a machine the should stay on to sleep ? + log_info('Handling sleep request for %s' % clusterNodeStatusUpdater.get_name()) + if clusterNodeStatusUpdater.set_queue_activation(False): + if clusterNodeStatusUpdater.queue_is_empty(): + if blocking_put_machine_to_sleep(clusterNodeStatusUpdater.cluster_node_name): # now we know that the machine is asleep - clusterNodeStatusUpdater.m_stateLock.acquire() - clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.SLEEP) - clusterNodeStatusUpdater.m_stateLock.release() - if self.m_sleepCompleteNotifier: - self.m_sleepCompleteNotifier.onSleepComplete(True) + clusterNodeStatusUpdater.state_lock.acquire() + clusterNodeStatusUpdater.cluster_node.set_power_state(PowerState.SLEEP) + clusterNodeStatusUpdater.state_lock.release() + if self.sleep_complete_notifier: + self.sleep_complete_notifier.on_sleep_complete(True) else: assert False else: # reactivate the queue - if not clusterNodeStatusUpdater.setQueueActivation(True): + if not clusterNodeStatusUpdater.set_queue_activation(True): assert False - clusterNodeStatusUpdater.m_stateLock.acquire() - clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.ON) # this is necessary to reenable the various cyclic checks that were disabled on sleep request - clusterNodeStatusUpdater.m_stateLock.release() - clusterNodeStatusUpdater.m_clusterNode.onSleepFailedBecauseAJobJustArrived() - if self.m_sleepCompleteNotifier: - self.m_sleepCompleteNotifier.onSleepComplete(False) + clusterNodeStatusUpdater.state_lock.acquire() + clusterNodeStatusUpdater.cluster_node.set_power_state(PowerState.ON) # this is necessary to reenable the various cyclic checks that were disabled on sleep request + clusterNodeStatusUpdater.state_lock.release() + clusterNodeStatusUpdater.cluster_node.on_sleep_because_a_job_just_arrived() + if self.sleep_complete_notifier: + self.sleep_complete_notifier.on_sleep_complete(False) else: assert False @@ -105,88 +112,99 @@ class CheckPowerStateRequest(IRequest): IRequest.__init__(self, IRequest.CHECK_POWER_STATE) def process(self, clusterNodeStatusUpdater): - powerState = getPowerState(clusterNodeStatusUpdater.m_clusterNodeName) - clusterNodeStatusUpdater.m_stateLock.acquire() - clusterNodeStatusUpdater.m_clusterNode.onNewPowerStateReading(powerState) - clusterNodeStatusUpdater.m_lastPowerStateCheckTime = time.time() - clusterNodeStatusUpdater.m_stateLock.release() + powerState = get_power_state(clusterNodeStatusUpdater.cluster_node_name) + clusterNodeStatusUpdater.state_lock.acquire() + clusterNodeStatusUpdater.cluster_node.on_new_power_state_reading(powerState) + clusterNodeStatusUpdater.last_power_check_state_time = time.time() + clusterNodeStatusUpdater.state_lock.release() class ClusterNodeStatusUpdater(threading.Thread): + cluster_node_name: 'ClusterNodeId' + cluster_node: 'ClusterNode' + grid_engine: 'SunGridEngine' + stop: bool + last_power_check_state_time: Optional[time.time] + check_power_state: bool + check_sensors: Optional[bool] + state_lock: threading.Lock # lock that prevents concurrent access to the state of this instance + should_always_be_on: bool # indicates that the machine should never go to sleep or off for whatever reason (eg simpatix10) + pending_requests_queue: List[IRequest] DELAY_BETWEEN_POWERSTATE_CHECKS = 5 * 60 # in seconds - def __init__(self, machineName, clusterNode, gridEngine): + def __init__(self, machineName: 'ClusterNodeId', clusterNode: 'ClusterNode', gridEngine: 'SunGridEngine'): threading.Thread.__init__(self) - self.m_clusterNodeName = machineName - self.m_clusterNode = clusterNode - self.m_gridEngine = gridEngine - self.m_bStop = False - self.m_lastPowerStateCheckTime = None # time.time() - self.m_bCheckPowerState = True - self.m_stateLock = threading.Lock() # lock that prevents concurrent access to the state of this instance - self.m_bShouldAlwaysBeOn = False # indicates that the machine should never go to sleep or off for whatever reason (eg simpatix10) - self.m_pendingRequestsQueue = [] + self.cluster_node_name = machineName + self.cluster_node = clusterNode + self.grid_engine = gridEngine + self.stop = False + self.last_power_check_state_time = None + self.check_power_state = True + self.state_lock = threading.Lock() + self.should_always_be_on = False + self.pending_requests_queue = [] + self.check_sensors = None - def getGridEngine(self): - return self.m_gridEngine + def get_grid_engine(self): + return self.grid_engine - def getName(self): - return self.m_clusterNodeName + def get_name(self): + return self.cluster_node_name - def setShouldAlwaysBeOn(self): - print('%s should always be on' % (self.getName())) - self.m_bShouldAlwaysBeOn = True + def set_should_always_be_on(self): + print('%s should always be on' % (self.get_name())) + self.should_always_be_on = True - def pushRequest(self, request): - self.m_stateLock.acquire() - self.m_pendingRequestsQueue.append(request) - self.m_stateLock.release() + def push_request(self, request: IRequest): + self.state_lock.acquire() + self.pending_requests_queue.append(request) + self.state_lock.release() - def popRequest(self): - oldestRequest = None - self.m_stateLock.acquire() - if len(self.m_pendingRequestsQueue) != 0: - oldestRequest = self.m_pendingRequestsQueue.pop(0) - self.m_stateLock.release() - return oldestRequest + def pop_request(self) -> IRequest: + oldest_request = None + self.state_lock.acquire() + if len(self.pending_requests_queue) != 0: + oldest_request = self.pending_requests_queue.pop(0) + self.state_lock.release() + return oldest_request def run(self): try: - while not self.m_bStop: + while not self.stop: # handle the oldest request - request = self.popRequest() + request = self.pop_request() if request is not None: request.process(self) # schedule a power state check if required currentTime = time.time() - if self.m_bCheckPowerState: - if not self.m_bShouldAlwaysBeOn: # don't do power checks on such machines because some current implementations of + if self.check_power_state: + if not self.should_always_be_on: # don't do power checks on such machines because some current implementations of # operations involved might cause the machine to go to sleep - if (not self.m_lastPowerStateCheckTime) or (currentTime > (self.m_lastPowerStateCheckTime + ClusterNodeStatusUpdater.DELAY_BETWEEN_POWERSTATE_CHECKS)): - self.pushRequest(CheckPowerStateRequest()) + if (not self.last_power_check_state_time) or (currentTime > (self.last_power_check_state_time + ClusterNodeStatusUpdater.DELAY_BETWEEN_POWERSTATE_CHECKS)): + self.push_request(CheckPowerStateRequest()) # pylint: disable=no-value-for-parameter time.sleep(1) except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) - onException(exception) + on_exception(exception) - def requestSleep(self, sleepCompleteNotifier=None): - assert not self.m_bShouldAlwaysBeOn - self.pushRequest(SleepRequest(sleepCompleteNotifier)) + def request_sleep(self, sleep_complete_notifier: Optional[ISleepCompleteNotifier] = None): + assert not self.should_always_be_on + self.push_request(SleepRequest(sleep_complete_notifier)) # pylint: disable=no-value-for-parameter - def requestWakeUp(self, wakeUpNotifier=None): - assert self.m_bShouldAlwaysBeOn is False - self.pushRequest(WakeUpRequest(wakeUpNotifier)) + def request_wake_up(self, wake_up_complete_notifier: Optional[IWakeUpCompleteNotifier] = None): + assert self.should_always_be_on is False + self.push_request(WakeUpRequest(wake_up_complete_notifier)) # pylint: disable=no-value-for-parameter - def getQueueMachineName(self): - return self.m_clusterNode.getQueueMachineName() + def get_queue_machine_name(self): + return self.cluster_node.get_queue_machine_name() - def setQueueActivation(self, bEnable): + def set_queue_activation(self, bEnable: bool): """ @return true on success, false otherwise """ - return self.getGridEngine().setQueueInstanceActivation(self.getQueueMachineName(), bEnable) + return self.get_grid_engine().set_queue_instance_activation(self.get_queue_machine_name(), bEnable) - def queueIsEmpty(self): - return self.getGridEngine().queueIsEmpty(self.getName()) + def queue_is_empty(self): + return self.get_grid_engine().queue_is_empty(self.get_name()) diff --git a/cocluto/ClusterController/ClusterStatus.py b/cocluto/ClusterController/ClusterStatus.py index 132acae..61adc19 100644 --- a/cocluto/ClusterController/ClusterStatus.py +++ b/cocluto/ClusterController/ClusterStatus.py @@ -1,11 +1,14 @@ -import threading -from JobsStateUpdater import JobsStateUpdater -import Lib.Util -import Lib.SimpaDbUtil -from ClusterNode import ClusterNode -from Log import logInfo, logError -from PowerState import PowerState import time +from typing import Dict, Optional, List +import threading +from .Job import TaskUid, Task, QueueMachineId, JobRequirements +from .JobsStateUpdater import JobsStateUpdater +from .ClusterNode import ClusterNode, ClusterNodeId +from .Log import log_info, logError +from .PowerState import PowerState +from ..SimpaDbUtil import get_cluster_machines_names +from .SunGridEngine import SunGridEngine +from .JobsState import JobsState class ClusterStatus: @@ -14,196 +17,203 @@ class ClusterStatus: @param gridEngine the interface to the batch job tool (in our case it's sun grid engine) """ - def __init__(self, gridEngine): - self.m_gridEngine = gridEngine - self.m_clusterNodes = {} - self.m_lock = threading.Lock() # to prevent concurrent access to this instance - self.m_jobsStateUpdater = JobsStateUpdater(self) - self.m_jobsState = None - # self.m_controlledMachineNames = ['simpatix30'] - self.m_controlledMachineNames = [] # ['simpatix30'] + grid_engine: SunGridEngine + cluster_nodes: Dict[ClusterNodeId, ClusterNode] + lock: threading.Lock # to prevent concurrent access to this instance + jobs_state_updater: JobsStateUpdater + jobs_state: Optional[JobsState] + controlled_machine_names: List[ClusterNodeId] + + def __init__(self, grid_engine: SunGridEngine): + self.grid_engine = grid_engine + self.cluster_nodes = {} + self.lock = threading.Lock() + self.jobs_state_updater = JobsStateUpdater(self) + self.jobs_state = None + # self.controlled_machine_names = ['simpatix30'] + self.controlled_machine_names = [] # ['simpatix30'] if False: for iMachine in range(11, 40): if (iMachine == 31) or (iMachine == 32): continue # these machines don't seem to be able to go to sleep properly (bug 00000010) if (iMachine == 18): continue # this machine needs maintenance (restarting because it's very slow for an unknown reason) - self.m_controlledMachineNames.append('simpatix%d' % iMachine) - nodeNames = Lib.SimpaDbUtil.getClusterMachinesNames() - for nodeName in nodeNames: - if nodeName in self.m_controlledMachineNames: - logInfo('machine %s is under the cluster controller\'s control' % nodeName) - clusterNode = ClusterNode(nodeName, self, gridEngine) - if nodeName == 'simpatix10': - clusterNode.setShouldAlwaysBeOn() - self.m_clusterNodes[nodeName] = clusterNode + self.controlled_machine_names.append('simpatix%d' % iMachine) + node_names = get_cluster_machines_names() + for node_name in node_names: + if node_name in self.controlled_machine_names: + log_info('machine %s is under the cluster controller\'s control' % node_name) + cluster_node = ClusterNode(node_name, self, grid_engine) + if node_name == 'simpatix10': + cluster_node.set_should_always_be_on() + self.cluster_nodes[node_name] = cluster_node return - def setControlOnMachine(self, machineName, bControl): - if bControl: + def set_control_on_machine(self, machine_name: ClusterNodeId, control: bool): + if control: # add machineName under control of ClusterController - for k, v in self.m_clusterNodes.items(): - if v.getName() == machineName: + for _k, v in self.cluster_nodes.items(): + if v.get_name() == machine_name: return # nothing to do : machineName is already under the control of ClusterController - clusterNode = ClusterNode(machineName, self, self.m_gridEngine) - if machineName == 'simpatix10': - clusterNode.setShouldAlwaysBeOn() - self.m_clusterNodes[machineName] = clusterNode - clusterNode.m_machineStatusUpdater.start() + cluster_node = ClusterNode(machine_name, self, self.grid_engine) + if machine_name == 'simpatix10': + cluster_node.set_should_always_be_on() + self.cluster_nodes[machine_name] = cluster_node + cluster_node.machine_status_updater.start() else: # remove machineName from control of ClusterController - clusterNode = self.m_clusterNodes.get(machineName) - if clusterNode: - clusterNode.m_machineStatusUpdater.m_bStop = True - clusterNode.m_machineStatusUpdater.join() - self.m_clusterNodes.pop(machineName) + cluster_node = self.cluster_nodes.get(machine_name) + if cluster_node: + cluster_node.machine_status_updater.stop = True + cluster_node.machine_status_updater.join() + self.cluster_nodes.pop(machine_name) - def getGridEngine(self): - return self.m_gridEngine + def get_grid_engine(self) -> SunGridEngine: + return self.grid_engine - def getMachines(self): - return self.m_clusterNodes + def get_machines(self) -> Dict[ClusterNodeId, ClusterNode]: + return self.cluster_nodes - def startReadingThreads(self): - for k, v in self.m_clusterNodes.items(): - v.m_machineStatusUpdater.start() - self.m_jobsStateUpdater.start() + def start_reading_threads(self): + for _k, v in self.cluster_nodes.items(): + v.machine_status_updater.start() + self.jobs_state_updater.start() - def stopReadingThreads(self): - for k, v in self.m_clusterNodes.items(): - v.m_machineStatusUpdater.m_bStop = True - v.m_machineStatusUpdater.join() - self.m_jobsStateUpdater.m_bStop = True - self.m_jobsStateUpdater.join() + def stop_reading_threads(self): + for _k, v in self.cluster_nodes.items(): + v.machine_status_updater.stop = True + v.machine_status_updater.join() + self.jobs_state_updater.stop = True + self.jobs_state_updater.join() - def onNewJobsState(self, newJobsState): - # logDebug('ClusterStatus::onNewJobsState : attempting to acquire lock to access m_jobsState') - self.m_lock.acquire() - # logDebug('ClusterStatus::onNewJobsState : got lock to access m_jobsState') - self.m_jobsState = newJobsState - self.m_lock.release() + def on_new_jobs_state(self, new_jobs_state: JobsState): + # logDebug('ClusterStatus::on_new_jobs_state : attempting to acquire lock to access jobs_state') + self.lock.acquire() + # logDebug('ClusterStatus::on_new_jobs_state : got lock to access jobs_state') + self.jobs_state = new_jobs_state + self.lock.release() - def getJobsOnMachine(self, machineName): - return self.m_jobsState.getJobsOnMachine(machineName) + def get_jobs_on_machine(self, machine_name: ClusterNodeId) -> Dict[TaskUid, Task]: + return self.jobs_state.get_jobs_on_machine(machine_name) - def isReady(self): - for k, v in self.m_clusterNodes.items(): - if not v.isReady(): - logInfo('ClusterStatus::isReady : not ready because of ' + v.getName()) + def is_ready(self) -> bool: + for _k, v in self.cluster_nodes.items(): + if not v.is_ready(): + log_info('ClusterStatus::is_ready : not ready because of ' + v.get_name()) return False - # log('ClusterStatus::isReady() : '+k+' is ready') + # log('ClusterStatus::is_ready() : '+k+' is ready') # assert(False) - if self.m_jobsState is None: - logInfo('ClusterStatus::isReady : not ready because waiting for jobs state') + if self.jobs_state is None: + log_info('ClusterStatus::is_ready : not ready because waiting for jobs state') return False return True - def getIdleMachines(self): - assert self.isReady + def get_idle_machines(self) -> Dict[ClusterNodeId, ClusterNode]: + assert self.is_ready bBUG_00000009_IS_STILL_ALIVE = True if bBUG_00000009_IS_STILL_ALIVE: currentTime = time.time() fJOBS_STATE_MAX_ALLOWED_AGE = 3600 - fJobsStateAge = currentTime - self.m_jobsState.getTime() + fJobsStateAge = currentTime - self.jobs_state.get_time() if fJobsStateAge > fJOBS_STATE_MAX_ALLOWED_AGE: - logError('ClusterStatus::getIdleMachines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge)) + logError('ClusterStatus::get_idle_machines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge)) assert False idleMachines = {} - for machineName, machine in self.m_clusterNodes.items(): - if machine.getPowerState() == PowerState.ON: - jobsOnThisMachine = self.getJobsOnMachine(machineName) + for machineName, machine in self.cluster_nodes.items(): + if machine.get_power_state() == PowerState.ON: + jobsOnThisMachine = self.get_jobs_on_machine(machineName) if len(jobsOnThisMachine) == 0: idleMachines[machineName] = machine return idleMachines - def getPendingJobs(self): - return self.m_jobsState.getPendingJobs() + def get_pending_jobs(self) -> Dict[TaskUid, Task]: + return self.jobs_state.get_pending_jobs() - def getJobsState(self): - return self.m_jobsState + def get_jobs_state(self) -> JobsState: + return self.jobs_state - def queueMachineFitsJobRequirements(self, queueMachine, jobRequirements): - if jobRequirements.m_queues: + def queue_machine_fits_job_requirements(self, queue_machine: QueueMachineId, job_requirements: JobRequirements) -> bool: + if job_requirements.queues: bQueueIsInAllowedQueues = False - for queueName in jobRequirements.m_queues: - if queueName == queueMachine.getQueueName(): + for queueName in job_requirements.queues: + if queueName == queue_machine.get_queue_name(): bQueueIsInAllowedQueues = True if not bQueueIsInAllowedQueues: - logInfo('queueMachineFitsJobRequirements : queueMachine ' + queueMachine.getName() + ' rejected because it\'s not in the allowed queues') + log_info('queue_machine_fits_job_requirements : queue_machine ' + queue_machine.get_name() + ' rejected because it\'s not in the allowed queues') return False return True - def getEnergyConsumption(self): + def get_energy_consumption(self) -> float: """ returns an estimate of the energy consumption since the start of the cluster controller (in joules) """ fEnergyConsumption = 0.0 - for machine in self.m_clusterNodes.values(): - if machine.isReady(): # there are cases where the machine is not ready yet (for example, it's just been added to clustercontroller's control) - fEnergyConsumption += machine.getEnergyConsumption() + for machine in self.cluster_nodes.values(): + if machine.is_ready(): # there are cases where the machine is not ready yet (for example, it's just been added to clustercontroller's control) + fEnergyConsumption += machine.get_energy_consumption() return fEnergyConsumption - def getEnergySavings(self): + def get_energy_savings(self) -> float: """ returns an estimate of the energy saving since the start of the cluster controller (in joules) """ fEnergySavings = 0.0 - for machine in self.m_clusterNodes.values(): - if machine.isReady(): - fEnergySavings += machine.getEnergySavings() + for machine in self.cluster_nodes.values(): + if machine.is_ready(): + fEnergySavings += machine.get_energy_savings() return fEnergySavings - def getCurrentPowerConsumption(self): - fPowerConsumption = 0.0 - for machine in self.m_clusterNodes.values(): - if machine.isReady(): - fPowerConsumption += machine.getPowerConsumption() - return fPowerConsumption + def get_current_power_consumption(self) -> float: + power_consumption = 0.0 + for machine in self.cluster_nodes.values(): + if machine.is_ready(): + power_consumption += machine.get_power_consumption() + return power_consumption - def getCurrentPowerSavings(self): - fPowerSavings = 0.0 - for machine in self.m_clusterNodes.values(): - if machine.isReady(): - fPowerSavings += machine.getPowerConsumptionForPowerState(PowerState.ON) - machine.getPowerConsumption() - return fPowerSavings + def get_current_power_savings(self) -> float: + power_savings = 0.0 + for machine in self.cluster_nodes.values(): + if machine.is_ready(): + power_savings += machine.get_power_consumption_for_power_state(PowerState.ON) - machine.get_power_consumption() + return power_savings - def getNumControlledSlots(self): - self.m_lock.acquire() - iNumControlledSlots = 0 - for machine in self.m_clusterNodes.values(): - queueMachine = self.m_jobsState.getQueueMachine(machine.getName()) - iNumControlledSlots += queueMachine.getNumSlots() - self.m_lock.release() - return iNumControlledSlots + def get_num_controlled_slots(self) -> int: + self.lock.acquire() + num_controlled_slots = 0 + for machine in self.cluster_nodes.values(): + queue_machine = self.jobs_state.get_queue_machine(machine.get_name()) + num_controlled_slots += queue_machine.get_num_slots() + self.lock.release() + return num_controlled_slots - def getNumUsedSlots(self): - self.m_lock.acquire() - iNumUsedSlots = 0 - for machine in self.m_clusterNodes.values(): - queueMachine = self.m_jobsState.getQueueMachine(machine.getName()) - iNumUsedSlotsOnThisMachine = queueMachine.getNumSlots() - self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) - assert iNumUsedSlotsOnThisMachine >= 0 - iNumUsedSlots += iNumUsedSlotsOnThisMachine - self.m_lock.release() - return iNumUsedSlots + def get_num_used_slots(self) -> int: + self.lock.acquire() + num_used_slots = 0 + for machine in self.cluster_nodes.values(): + queue_machine = self.jobs_state.get_queue_machine(machine.get_name()) + num_used_slots_on_this_machine = queue_machine.get_num_slots() - self.jobs_state.getNumFreeSlotsOnQueueMachine(queue_machine) + assert num_used_slots_on_this_machine >= 0 + num_used_slots += num_used_slots_on_this_machine + self.lock.release() + return num_used_slots - def getNumWastedSlots(self): - self.m_lock.acquire() + def get_num_wasted_slots(self) -> int: + self.lock.acquire() iNumWastedSlots = 0 - for machine in self.m_clusterNodes.values(): - if machine.getPowerState() == PowerState.ON: - queueMachine = self.m_jobsState.getQueueMachine(machine.getName()) - iNumWastedSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) - self.m_lock.release() + for machine in self.cluster_nodes.values(): + if machine.get_power_state() == PowerState.ON: + queue_machine = self.jobs_state.get_queue_machine(machine.get_name()) + iNumWastedSlots += self.jobs_state.getNumFreeSlotsOnQueueMachine(queue_machine) + self.lock.release() return iNumWastedSlots - def getNumSleepingSlots(self): - self.m_lock.acquire() + def get_num_sleeping_slots(self) -> int: + self.lock.acquire() iNumSleepingSlots = 0 - for machine in self.m_clusterNodes.values(): - if machine.getPowerState() == PowerState.SLEEP: - queueMachine = self.m_jobsState.getQueueMachine(machine.getName()) - iNumSleepingSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine) - self.m_lock.release() + for machine in self.cluster_nodes.values(): + if machine.get_power_state() == PowerState.SLEEP: + queue_machine = self.jobs_state.get_queue_machine(machine.get_name()) + iNumSleepingSlots += self.jobs_state.getNumFreeSlotsOnQueueMachine(queue_machine) + self.lock.release() return iNumSleepingSlots diff --git a/cocluto/ClusterController/Install.py b/cocluto/ClusterController/Install.py index 9d8b9ca..1f7d00a 100755 --- a/cocluto/ClusterController/Install.py +++ b/cocluto/ClusterController/Install.py @@ -26,7 +26,7 @@ if __name__ == '__main__': remoteCommand += 'launchctl unload /Library/LaunchDaemons/fr.univ-rennes1.ipr.ClusterController.plist;' remoteCommand += 'launchctl load /Library/LaunchDaemons/fr.univ-rennes1.ipr.ClusterController.plist;' command = 'ssh root@'+ machineName +' "'+remoteCommand+'"' - ( returnCode, stdout, stderr ) = executeCommand( command ) + ( returnCode, stdout, stderr ) = execute_command( command ) for strSingleCommand in remoteCommand.split(';'): print(strSingleCommand) print(stdout) diff --git a/cocluto/ClusterController/Job.py b/cocluto/ClusterController/Job.py index 3b6f17f..71df961 100644 --- a/cocluto/ClusterController/Job.py +++ b/cocluto/ClusterController/Job.py @@ -1,3 +1,4 @@ +import enum from typing import Optional, Dict, List from datetime import datetime @@ -13,7 +14,7 @@ class JobStateFlags: SUSPENDED = 128 -class ParallelEnvironment: +class ParallelEnvironment(enum.Enum): MPI = 1 @@ -29,14 +30,14 @@ ResourceRequest = str # eg 'mem_available=5G' class JobRequirements: num_slots: Optional[int] architecture: Optional[str] # machine architecture - m_parallelEnvironment: Optional[int] # todo: make ParallelEnvironment an Enum + parallel_environment: Optional[ParallelEnvironment] queues: Optional[List[QueueId]] # the list of queues this job is allowed to run on resources: Optional[List[ResourceRequest]] def __init__(self): self.num_slots = None self.architecture = None - self.m_parallelEnvironment = None + self.parallel_environment = None self.queues = None self.resources = None @@ -62,10 +63,10 @@ class TaskUid: """ required to use a TaskUid as a dict hash key """ - hash = self.job_id * self.MAX_NUM_JOBS_IN_ARRAY + _hash = self.job_id * self.MAX_NUM_JOBS_IN_ARRAY if self.task_id is not None: - hash += self.task_id - return hash + _hash += self.task_id + return _hash def __eq__(self, other: 'TaskUid'): """ diff --git a/cocluto/ClusterController/JobsState.py b/cocluto/ClusterController/JobsState.py index 4251202..4f421d1 100644 --- a/cocluto/ClusterController/JobsState.py +++ b/cocluto/ClusterController/JobsState.py @@ -1,26 +1,30 @@ from typing import Dict -from .Log import * -from .Job import Task, TaskUid +from datetime import datetime +# from .Log import log_info +from .Job import Task, TaskUid, QueueMachineId +from .QueueMachine import QueueMachine class JobsState: """ - represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \*" + represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \\*" """ - tasks: Dict[TaskUid, Task] - job_array_tasks: Dict[int, Dict[TaskUid, Task]] + tasks: Dict[TaskUid, Task] # list of tasks + job_array_tasks: Dict[int, Dict[TaskUid, Task]] # a dictionary of jobs for each job array, indexed by job array id + queue_machines: Dict[QueueMachineId, QueueMachine] # list of queue machines such as allintel.q@simpatix10 + state_time: datetime # the time at which the state was snapshot def __init__(self): - self.tasks = {} # list of jobs - self.job_array_tasks = {} # a dictionary of jobs for each job array, indexed by job array id - self.m_queueMachines = {} # list of queue machines such as allintel.q@simpatix10 - self.m_stateTime = None # the time at which the state was snapshot + self.tasks = {} + self.job_array_tasks = {} + self.queue_machines = {} + self.state_time = None - def deleteAllJobs(self): + def delete_all_tasks(self): self.tasks = {} self.job_array_tasks = {} - def addTask(self, task: Task): + def add_task(self, task: Task): task_uid = task.get_id() self.tasks[task_uid] = task if task_uid.is_job_array_element(): @@ -36,56 +40,56 @@ class JobsState: def get_job_array_tasks(self, job_array_id: int) -> Dict[TaskUid, Task]: return self.job_array_tasks.get(job_array_id) - def setTime(self, stateTime): - self.m_stateTime = stateTime + def set_time(self, state_time: datetime): + self.state_time = state_time - def getTime(self): - return self.m_stateTime + def get_time(self) -> datetime: + return self.state_time - def getJobsOnMachine(self, machineName): + def get_jobs_on_machine(self, machine_name: str) -> Dict[TaskUid, Task]: jobs_on_machine = {} for task_uid, task in self.tasks.items(): - for queueMachineName, numSlots in task.get_slots().items(): - jobMachineName = queueMachineName.split('@')[1] - if jobMachineName == machineName: + for queue_machine_name, _num_slots in task.get_slots().items(): + jobMachineName = queue_machine_name.split('@')[1] + if jobMachineName == machine_name: jobs_on_machine[task_uid] = task return jobs_on_machine - def getNumFreeSlotsOnQueueMachine(self, queueMachine): - # logInfo('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.getName()) + def get_num_free_slots_on_queue_machine(self, queue_machine: QueueMachine) -> int: + # log_info('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.get_name()) numUsedSlots = 0 for job in self.tasks.values(): - numUsedSlotsByThisJob = job.get_slots().get(queueMachine.getName()) + numUsedSlotsByThisJob = job.get_slots().get(queue_machine.get_name()) if numUsedSlotsByThisJob is not None: - # logInfo('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob)) + # log_info('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob)) numUsedSlots += numUsedSlotsByThisJob else: - None - # logInfo('getNumFreeSlotsOnQueueMachine : job %d uses no slot' % job.getId().asStr()) - numFreeSlots = queueMachine.getNumSlots() - numUsedSlots + pass + # log_info('getNumFreeSlotsOnQueueMachine : job %d uses no slot' % job.getId().asStr()) + numFreeSlots = queue_machine.get_num_slots() - numUsedSlots assert numFreeSlots >= 0 return numFreeSlots - def addQueueMachine(self, queueMachine): - self.m_queueMachines[queueMachine.getName()] = queueMachine + def add_queue_machine(self, queue_machine: QueueMachine): + self.queue_machines[queue_machine.get_name()] = queue_machine - def getQueueMachine(self, machineName): + def get_queue_machine(self, machine_name) -> QueueMachine: """ finds the queue machine associated with a machine """ queueMachine = None - for qmName, qm in self.m_queueMachines.items(): - if qm.m_machineName == machineName: + for _qname, qm in self.queue_machines.items(): + if qm.machine_name == machine_name: assert queueMachine is None # to be sure that no more than one queue machine is on a given machine queueMachine = qm return queueMachine - def getQueueMachines(self): - return self.m_queueMachines + def get_queue_machines(self) -> Dict[QueueMachineId, QueueMachine]: + return self.queue_machines - def getPendingJobs(self): - pendingJobs = {} - for jobId, job in self.tasks.items(): - if job.is_pending(): - pendingJobs[job.get_id()] = job - return pendingJobs + def get_pending_jobs(self) -> Dict[TaskUid, Task]: + pending_jobs = {} + for _task_id, task in self.tasks.items(): + if task.is_pending(): + pending_jobs[task.get_id()] = task + return pending_jobs diff --git a/cocluto/ClusterController/JobsStateUpdater.py b/cocluto/ClusterController/JobsStateUpdater.py index 92ec0af..6980f6e 100644 --- a/cocluto/ClusterController/JobsStateUpdater.py +++ b/cocluto/ClusterController/JobsStateUpdater.py @@ -1,35 +1,39 @@ +from typing import TYPE_CHECKING import threading -import Util -import os -import traceback -import sys import time +from .Util import on_exception +if TYPE_CHECKING: + from .ClusterStatus import ClusterStatus -class JobsStateUpdater( threading.Thread ): - DELAY_BETWEEN_STATUS_CHECKS=10 # in seconds - def __init__( self, clusterStatus ): + +class JobsStateUpdater(threading.Thread): + cluster_status: 'ClusterStatus' + stop: bool + DELAY_BETWEEN_STATUS_CHECKS = 10 # in seconds + + def __init__(self, clusterStatus): threading.Thread.__init__(self) - self.m_clusterStatus = clusterStatus - self.m_bStop = False - - def getName( self ): - return 'JobsStateUpdater' - - def getGridEngine( self ): - return self.m_clusterStatus.getGridEngine() - - def updateClusterStatus( self ): - #log('JobsStateUpdater::updateClusterStatus : start') + self.cluster_status = clusterStatus + self.stop = False - jobsState = self.getGridEngine().getCurrentJobsState() + def get_name(self): + return 'JobsStateUpdater' + + def get_grid_engine(self): + return self.cluster_status.get_grid_engine() + + def updateClusterStatus(self): + # log('JobsStateUpdater::updateClusterStatus : start') + + jobsState = self.get_grid_engine().getCurrentJobsState() # update the jobs in the cluster status - self.m_clusterStatus.onNewJobsState( jobsState ) - #log('JobsStateUpdater::updateClusterStatus : end') - - def run( self ): + self.cluster_status.on_new_jobs_state(jobsState) + # log('JobsStateUpdater::updateClusterStatus : end') + + def run(self): try: - while not self.m_bStop : + while not self.stop: self.updateClusterStatus() time.sleep(JobsStateUpdater.DELAY_BETWEEN_STATUS_CHECKS) - except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) - Util.onException(exception) + except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) + on_exception(exception) diff --git a/cocluto/ClusterController/Log.py b/cocluto/ClusterController/Log.py index c9bb092..bd16608 100644 --- a/cocluto/ClusterController/Log.py +++ b/cocluto/ClusterController/Log.py @@ -5,10 +5,10 @@ gLogFilePath = '/tmp/ClusterController.log' # '/var/log/ClusterController.log' def log(message): - threadName = threading.currentThread().getName() + threadName = threading.currentThread().get_name() logMessage = time.asctime(time.localtime()) + ' : ' + threadName + ' : ' + message print(logMessage) - f = open(gLogFilePath, 'a+') + f = open(gLogFilePath, 'a+', encoding='utf8') assert f try: f.write(logMessage + '\n') @@ -21,11 +21,11 @@ def logDebug(message): return -def logInfo(message): +def log_info(message): log('[I]' + message) -def logWarning(message): +def log_warning(message): log('[W]' + message) diff --git a/cocluto/ClusterController/PowerState.py b/cocluto/ClusterController/PowerState.py index 1ce73da..2e83183 100644 --- a/cocluto/ClusterController/PowerState.py +++ b/cocluto/ClusterController/PowerState.py @@ -1,5 +1,8 @@ -class PowerState: +import enum + + +class PowerState(enum.Enum): UNKNOWN = 0 OFF = 1 ON = 2 diff --git a/cocluto/ClusterController/QstatParser.py b/cocluto/ClusterController/QstatParser.py index f03cc5f..3d80e61 100644 --- a/cocluto/ClusterController/QstatParser.py +++ b/cocluto/ClusterController/QstatParser.py @@ -2,7 +2,6 @@ import io import re from .JobsState import JobsState from .QueueMachine import QueueMachine, QueueMachineStateFlags -from .Util import * from .Log import logError from .Job import JobStateFlags, TaskUid, Task, ParallelEnvironment, JobState import logging @@ -54,13 +53,13 @@ class QstatParser: assert False, 'unhandled queue machine state flag :"' + c + '"' return queueMachineState - def parseQstatOutput(self, qstatOutput, cluster_domain: str = 'ipr.univ-rennes1.fr'): + def parse_qstat_output(self, qstat_output: str, cluster_domain: str = 'ipr.univ-rennes1.fr'): """ parses result of command 'qstat -f -u \\* -pri' cluster_domain: network domain of the cluster (eg 'ipr.univ-rennes.fr'). This information is missing from qstat's output and is used to form the fully qualified domain name of the cluster machines. """ - logging.debug('qstatOutput type : %s' % type(qstatOutput)) + logging.debug('qstatOutput type : %s', type(qstat_output)) def parse_pending_tasks(task_ranges_sequence): """ @@ -99,13 +98,13 @@ class QstatParser: # --------------------------------------------------------------------------------- # main.q@physix88.ipr.univ-renne BIP 0/0/36 14.03 lx-amd64 # TODO: fix this properly by parsing the output of 'qstat -f -u \* -xml' instead of 'qstat -f -u \*' - qstatOutput = re.sub(r'\.ipr\.univ[^ ]*', f'.{cluster_domain}', qstatOutput) + qstat_output = re.sub(r'\.ipr\.univ[^ ]*', f'.{cluster_domain}', qstat_output) jobsState = JobsState() - f = io.StringIO(qstatOutput) + f = io.StringIO(qstat_output) line = f.readline() - currentQueueMachine = None - bInPendingJobsSection = False + current_queue_machine = None + in_pending_jobs_section = False # examples of job line : # 43521 0.55108 Confidiso3 aghoufi r 08/19/2009 18:40:09 1 # a typical job line in the pending jobs section looks like this : @@ -120,42 +119,42 @@ class QstatParser: # ntckts The job's ticket amount in normalized fashion. # ppri The job's -p priority as specified by the user. - jobRegularExp = re.compile(r'^[ ]*(?P[^ ]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P-?[0-9]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P[0-9]+)[ ]+(?P[^\n]*)[\s]*$') + job_regular_exp = re.compile(r'^[ ]*(?P[^ ]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P[0-9.]+)[ ]+(?P-?[0-9]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P[0-9]+)[ ]+(?P[^\n]*)[\s]*$') # example of machine line : # allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86 - machineRegularExp = re.compile(r'^(?P[^@]+)@(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^/]+)/(?P[^/]+)/(?P[^ ]+)[ ]+(?P[^ ]+)[\s]+(?P[^ ]+)[\s]+(?P[^\s]*)') - pendingJobsHeaderRegularExp = re.compile('^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*') + machine_regular_exp = re.compile(r'^(?P[^@]+)@(?P[^ ]+)[ ]+(?P[^ ]+)[ ]+(?P[^/]+)/(?P[^/]+)/(?P[^ ]+)[ ]+(?P[^ ]+)[\s]+(?P[^ ]+)[\s]+(?P[^\s]*)') + pending_jobs_header_regular_exp = re.compile('^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*') while len(line) > 0: # print line # check if the current line is a line describing a job running on a machine - matchObj = jobRegularExp.match(line) - if matchObj: + match_obj = job_regular_exp.match(line) + if match_obj: # we are dealing with a job line - if not bInPendingJobsSection: - assert currentQueueMachine + if not in_pending_jobs_section: + assert current_queue_machine # log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"') - job_id = int(matchObj.group('jobId')) - logging.debug('iJobId = %d' % job_id) - jobState = self.parseJobState(matchObj.group('jobStatus')) - strJobArrayDetails = matchObj.group('jobArrayDetails') - bIsJobArray = (len(strJobArrayDetails) != 0) + job_id = int(match_obj.group('jobId')) + logging.debug('iJobId = %d', job_id) + job_state = self.parseJobState(match_obj.group('jobStatus')) + job_array_details = match_obj.group('jobArrayDetails') + is_job_array = (len(job_array_details) != 0) # logDebug('strJobArrayDetails = "%s", bIsJobArray=%d' % (strJobArrayDetails, int(bIsJobArray))) # each element of a job array is treated as a separate job for the sake of simplicity. # For these elements, the job id in sge sense is the same, but they are different in this program's sense task_ids = range(0, 1) # just one element, unless it's a job array - if bIsJobArray: - if bInPendingJobsSection: - task_ids = parse_pending_tasks(strJobArrayDetails) + if is_job_array: + if in_pending_jobs_section: + task_ids = parse_pending_tasks(job_array_details) else: # we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element - iJobArrayElementIndex = int(strJobArrayDetails) - assert iJobArrayElementIndex != 0 # sge does not allow element indices to be 0 - task_ids = range(iJobArrayElementIndex, iJobArrayElementIndex + 1) - logging.debug('task_ids = %s' % task_ids) + task_id = int(job_array_details) + assert task_id != 0 # sge does not allow element indices to be 0 + task_ids = range(task_id, task_id + 1) + logging.debug('task_ids = %s', task_ids) for task_id in task_ids: - logging.debug('task_id = %s' % task_id) + logging.debug('task_id = %s', task_id) task_uid = None - if bIsJobArray: + if is_job_array: task_uid = TaskUid(job_id, task_id) else: task_uid = TaskUid(job_id) @@ -165,57 +164,57 @@ class QstatParser: # this job hasn't been encountered yet in the output of qstat ... # we could either be in the pending jobs section or in the running jobs section task = Task(task_uid) - jobsState.addTask(task) - task.set_state(jobState) - strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime') - jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S') - if bInPendingJobsSection: - task.get_submit_time(jobStartOrSubmitTime) + jobsState.add_task(task) + task.set_state(job_state) + job_start_or_submit_time_as_str = match_obj.group('jobStartOrSubmitTime') + job_start_or_submit_time = time.strptime(job_start_or_submit_time_as_str, '%m/%d/%Y %H:%M:%S') + if in_pending_jobs_section: + task.get_submit_time(job_start_or_submit_time) else: - task.set_start_time(jobStartOrSubmitTime) - task.set_owner(matchObj.group('jobOwner')) - task.set_script_name(matchObj.group('jobScriptName')) - if bInPendingJobsSection: - task.set_num_required_slots(int(matchObj.group('numSlots'))) + task.set_start_time(job_start_or_submit_time) + task.set_owner(match_obj.group('jobOwner')) + task.set_script_name(match_obj.group('jobScriptName')) + if in_pending_jobs_section: + task.set_num_required_slots(int(match_obj.group('numSlots'))) else: - assert not bInPendingJobsSection # if we are in the pending jobs section, the job should be new - if not bInPendingJobsSection: - task.add_slots(currentQueueMachine.getName(), int(matchObj.group('numSlots'))) + assert not in_pending_jobs_section # if we are in the pending jobs section, the job should be new + if not in_pending_jobs_section: + task.add_slots(current_queue_machine.get_name(), int(match_obj.group('numSlots'))) else: # the current line does not describe a job - if not bInPendingJobsSection: + if not in_pending_jobs_section: # check if this line describes the status of a machine - matchObj = machineRegularExp.match(line) - if matchObj: - queueName = matchObj.group('queueName') - machineName = matchObj.group('machineName') - queueMachine = QueueMachine(queueName, machineName) + match_obj = machine_regular_exp.match(line) + if match_obj: + queue_name = match_obj.group('queueName') + machine_name = match_obj.group('machineName') + queue_machine = QueueMachine(queue_name, machine_name) # log(line) # log('matchObj.group(queueTypeString) :' + matchObj.group('queueTypeString')) # log('matchObj.group(numTotalSlots) :' + matchObj.group('numTotalSlots')) - queueMachine.setNumSlots(int(matchObj.group('numTotalSlots'))) - queueMachine.setNumUsedSlots(int(matchObj.group('numUsedSlots'))) - strCpuLoad = matchObj.group('cpuLoad') - if strCpuLoad != '-NA-': - queueMachine.setCpuLoad(float(strCpuLoad)) + queue_machine.set_num_slots(int(match_obj.group('numTotalSlots'))) + queue_machine.set_num_used_slots(int(match_obj.group('numUsedSlots'))) + cpu_load_as_str = match_obj.group('cpuLoad') + if cpu_load_as_str != '-NA-': + queue_machine.set_cpu_load(float(cpu_load_as_str)) - strQueueMachineState = matchObj.group('queueMachineStatus') - queueMachine.setState(self.parseQueueMachineState(strQueueMachineState)) + queue_machine_state_as_str = match_obj.group('queueMachineStatus') + queue_machine.set_state(self.parseQueueMachineState(queue_machine_state_as_str)) # log('QstatParser::parseQstatOutput : queueName = "'+matchObj.group('queueName')+'"') # log('QstatParser::parseQstatOutput : machineName = "'+matchObj.group('machineName')+'"') - currentQueueMachine = queueMachine - jobsState.addQueueMachine(queueMachine) + current_queue_machine = queue_machine + jobsState.add_queue_machine(queue_machine) else: - matchObj = pendingJobsHeaderRegularExp.match(line) - if matchObj: - bInPendingJobsSection = True - currentQueueMachine = None + match_obj = pending_jobs_header_regular_exp.match(line) + if match_obj: + in_pending_jobs_section = True + current_queue_machine = None else: pass else: # we are in a pending jobs section - matchObj = re.match('^[#]+$', line) - if not matchObj: + match_obj = re.match('^[#]+$', line) + if not match_obj: # unexpected line print('line = "' + line + '"') assert False @@ -223,11 +222,11 @@ class QstatParser: f.close() return jobsState - def parseJobDetails(self, qstatOutput, job): + def parse_job_details(self, qstat_output: str, task: Task): """ adds to job the details parsed from the output of the "qstat -j " command """ - f = io.StringIO(qstatOutput) + f = io.StringIO(qstat_output) line = f.readline() fieldRegularExp = re.compile('^(?P[^:]+):[ ]+(?P[?]*)$') while len(line) > 0: @@ -238,20 +237,20 @@ class QstatParser: fieldName = matchObj.group('fieldName') strFieldValue = matchObj.group('fieldValue') if fieldName == 'job_number': - assert job.getId().asStr() == strFieldValue + assert task.getId().asStr() == strFieldValue elif fieldName == 'hard_queue_list': allowedQueues = strFieldValue.split(',') assert len(allowedQueues) > 0 - job.m_jobRequirements.m_queues = allowedQueues + task.job_requirements.queues = allowedQueues elif fieldName == 'parallel environment': # the value could be 'ompi range: 32' matchObj = re.match('ompi range: (?P[0-9]+)[?]*', strFieldValue) if matchObj: - job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI + task.job_requirements.parallel_environment = ParallelEnvironment.MPI else: assert False else: - # ignore he other fields - None + # ignore the other fields + pass line = f.readline() f.close() diff --git a/cocluto/ClusterController/QueueMachine.py b/cocluto/ClusterController/QueueMachine.py index 0ade95f..36d2a2d 100644 --- a/cocluto/ClusterController/QueueMachine.py +++ b/cocluto/ClusterController/QueueMachine.py @@ -1,3 +1,7 @@ +from typing import Optional +from .Job import QueueMachineId +from .ClusterNode import ClusterNodeId + class QueueMachineStateFlags: # DISABLED = 1 # the queue machine is disabled @@ -12,70 +16,73 @@ class QueueMachine: """ a QueueMachine instance represents a given SGE queue on a given machine (eg allintel.q@simpatix10) """ - def __init__(self, queueName, machineName): - self.m_queueName = queueName - self.m_machineName = machineName - self.m_numSlots = None - self.m_numUsedSlots = None - self.m_fCpuLoad = None - self.m_stateFlags = 0 - self.m_strDisableMessage = '' + queue_name: str + machine_name: ClusterNodeId + num_slots: Optional[int] + num_used_slots: Optional[int] + cpu_load: Optional[float] + state_flags: int + disable_message: str - def getName(self): + def __init__(self, queueName, machineName): + self.queue_name = queueName + self.machine_name = machineName + self.num_slots = None + self.num_used_slots = None + self.cpu_load = None + self.state_flags = 0 + self.disable_message = '' + + def get_name(self) -> QueueMachineId: """ returns the name of the machine queue (such as allintel.q@simpatix10) """ - return self.m_queueName + '@' + self.m_machineName + return self.queue_name + '@' + self.machine_name - def getQueueName(self): - return self.m_queueName + def get_queue_name(self) -> str: + return self.queue_name - def getMachineName(self): - return self.m_machineName + def get_machine_name(self) -> str: + return self.machine_name - def setNumSlots(self, numSlots): - self.m_numSlots = numSlots + def set_num_slots(self, num_slots: int): + self.num_slots = num_slots - def setNumUsedSlots(self, numSlots): - self.m_numUsedSlots = numSlots + def set_num_used_slots(self, num_slots: int): + self.num_used_slots = num_slots - def getNumSlots(self): - assert self.m_numSlots is not None - return self.m_numSlots + def get_num_slots(self) -> int: + assert self.num_slots is not None + return self.num_slots - def getNumUsedSlots(self): - assert self.m_numUsedSlots is not None - return self.m_numUsedSlots + def get_num_used_slots(self) -> int: + assert self.num_used_slots is not None + return self.num_used_slots - def setCpuLoad(self, fCpuLoad): - self.m_fCpuLoad = fCpuLoad + def set_cpu_load(self, cpu_load: float): + self.cpu_load = cpu_load - def cpuLoadIsAvailable(self): - return self.m_fCpuLoad is not None + def cpu_load_is_available(self) -> bool: + return self.cpu_load is not None - def getCpuLoad(self): - assert self.m_fCpuLoad is not None - return self.m_fCpuLoad + def get_cpu_load(self) -> float: + assert self.cpu_load is not None + return self.cpu_load - def setState(self, state): - self.m_stateFlags = state + def set_state(self, state: int): + self.state_flags = state - def isDisabled(self): - return self.m_stateFlags & QueueMachineStateFlags.DISABLED + def is_disabled(self) -> bool: + return self.state_flags & QueueMachineStateFlags.DISABLED - def isInErrorState(self): - return self.m_stateFlags & QueueMachineStateFlags.ERROR + def is_in_error_state(self) -> bool: + return self.state_flags & QueueMachineStateFlags.ERROR - def isResponding(self): - return not (self.m_stateFlags & QueueMachineStateFlags.UNKNOWN) + def is_responding(self) -> bool: + return not (self.state_flags & QueueMachineStateFlags.UNKNOWN) - def isInAlarmState(self): - return self.m_stateFlags & QueueMachineStateFlags.ALARM + def is_in_alarm_state(self) -> bool: + return self.state_flags & QueueMachineStateFlags.ALARM - def isSuspended(self): - return self.m_stateFlags & QueueMachineStateFlags.SUSPENDED - """ - def getStateAsString(self): - assert(self.m_strState is not None) - return self.m_strState - """ + def is_suspended(self) -> bool: + return self.state_flags & QueueMachineStateFlags.SUSPENDED diff --git a/cocluto/ClusterController/SlotAllocator.py b/cocluto/ClusterController/SlotAllocator.py index 43b66a0..a8928c9 100644 --- a/cocluto/ClusterController/SlotAllocator.py +++ b/cocluto/ClusterController/SlotAllocator.py @@ -1,21 +1,24 @@ -from PowerState import PowerState -from Log import logInfo + +import abc import time import copy +from .PowerState import PowerState +from .Log import log_info class Slot: def __init__(self): - self.m_queueMachine = None - self.m_numSlots = None - self.m_job = None # job for which this slot is allocated + self.queue_machine = None + self.num_slots = None + self.jobs = None # job for which this slot is allocated -class SlotAllocator: +class SlotAllocator(abc.ABCMeta): """ a class that defines a strategy for allocating free slots for the given pending jobs """ - def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState): + @abc.abstractmethod + def get_machinesThatNeedWakeUp(self, pendingJobs, clusterState): """ returns the list of machines that need to wake up to make pending jobs running """ @@ -23,44 +26,45 @@ class SlotAllocator: class SimpleSlotAllocator(SlotAllocator): - def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState): + + def get_machinesThatNeedWakeUp(self, pendingJobs, clusterState): machinesThatNeedWakeUp = {} highestPriorityPendingJob = pendingJobs.values()[0] - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr()) + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : looking for free slots for job ' + highestPriorityPendingJob.getId().asStr()) numFreeSlots = {} # contains the number of free slots for each queueMachine - for queueMachine in clusterState.getJobsState().getQueueMachines().values(): - numFreeSlots[queueMachine] = clusterState.getJobsState().getNumFreeSlotsOnQueueMachine(queueMachine) - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : init numFreeSlots[%s] with %d ' % (queueMachine.getName(), numFreeSlots[queueMachine])) - remainingNumSlotsToAllocate = highestPriorityPendingJob.m_jobRequirements.m_numSlots - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate) + for queueMachine in clusterState.get_jobs_state().get_queue_machines().values(): + numFreeSlots[queueMachine] = clusterState.get_jobs_state().getNumFreeSlotsOnQueueMachine(queueMachine) + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : init numFreeSlots[%s] with %d ' % (queueMachine.get_name(), numFreeSlots[queueMachine])) + remainingNumSlotsToAllocate = highestPriorityPendingJob.job_requirements.num_slots + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate) # first look in running machines if there are available slots - for queueMachine in clusterState.getJobsState().getQueueMachines().values(): - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName()) - machine = clusterState.getMachines()[queueMachine.getMachineName()] - if machine.getPowerState() == PowerState.ON: - if clusterState.queueMachineFitsJobRequirements(queueMachine, highestPriorityPendingJob.m_jobRequirements): + for queueMachine in clusterState.get_jobs_state().get_queue_machines().values(): + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.get_name()) + machine = clusterState.get_machines()[queueMachine.get_machine_name()] + if machine.get_power_state() == PowerState.ON: + if clusterState.queue_machine_fits_job_requirements(queueMachine, highestPriorityPendingJob.job_requirements): numSlotsAllocatedOnThisMachine = min(numFreeSlots[queueMachine], remainingNumSlotsToAllocate) - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName())) + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : found %d slots on already running %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.get_machine_name())) remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine numFreeSlots[queueMachine] -= numSlotsAllocatedOnThisMachine - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate) + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate) assert remainingNumSlotsToAllocate >= 0 if remainingNumSlotsToAllocate == 0: break if remainingNumSlotsToAllocate > 0: # now look into machines that are asleep - for queueMachine in clusterState.getJobsState().getQueueMachines().values(): - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.getName()) - machine = clusterState.getMachines()[queueMachine.getMachineName()] - if machine.getPowerState() == PowerState.SLEEP: - if clusterState.queueMachineFitsJobRequirements(queueMachine, highestPriorityPendingJob.m_jobRequirements): + for queueMachine in clusterState.get_jobs_state().get_queue_machines().values(): + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : examining queueMachine %s ' % queueMachine.get_name()) + machine = clusterState.get_machines()[queueMachine.get_machine_name()] + if machine.get_power_state() == PowerState.SLEEP: + if clusterState.queue_machine_fits_job_requirements(queueMachine, highestPriorityPendingJob.job_requirements): numSlotsAllocatedOnThisMachine = min(numFreeSlots[queueMachine], remainingNumSlotsToAllocate) - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.getMachineName())) + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : found %d slots on sleeping %s ' % (numSlotsAllocatedOnThisMachine, queueMachine.get_machine_name())) remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine numFreeSlots[queueMachine] -= numSlotsAllocatedOnThisMachine - machinesThatNeedWakeUp[machine.getName()] = machine - logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate) + machinesThatNeedWakeUp[machine.get_name()] = machine + log_info('SimpleSlotAllocator::get_machinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate) assert remainingNumSlotsToAllocate >= 0 if remainingNumSlotsToAllocate == 0: break @@ -75,9 +79,9 @@ class DecoupledSlotAllocator(SlotAllocator): Instead, it uses a very simple strategy : it wakes up all the machines periodically to allow jobs to get in. """ def __init__(self): - self.m_delayBetweenPeriodicChecks = -1 # in seconds. Disable periodic checks by setting this to -1 - self.m_lastCheckTime = time.time() - self.m_lastClusterState = None + self.delay_between_periodic_checks = -1 # in seconds. Disable periodic checks by setting this to -1 + self.last_check_time = time.time() + self.last_cluster_state = None def jobsStateHasChanged(self, newClusterState): """ @@ -85,26 +89,24 @@ class DecoupledSlotAllocator(SlotAllocator): to start (provided all machines are enabled) """ oldJobs = {} - if self.m_lastClusterState: - oldJobs = self.m_lastClusterState.m_jobsState.m_jobs - newJobs = newClusterState.m_jobsState.m_jobs + if self.last_cluster_state: + oldJobs = self.last_cluster_state.jobs_state.jobs + newJobs = newClusterState.jobs_state.jobs bJobsHaveChanged = False oldJobsOnly = oldJobs.copy() # shallow copy # print 'oldJobs : ', oldJobs # print 'newJobs : ', newJobs - """ - print 'self.m_lastClusterState', self.m_lastClusterState - print 'newClusterState', newClusterState - if self.m_lastClusterState: - print 'self.m_lastClusterState.m_jobsState', self.m_lastClusterState.m_jobsState - print 'newClusterState.m_jobsState', newClusterState.m_jobsState - print 'id(self.m_lastClusterState) : ', id(self.m_lastClusterState) - print 'id(newClusterState) : ', id(newClusterState) - print 'len(oldJobs) : ', len(oldJobs) - print 'len(newJobs) : ', len(newJobs) - print 'id(oldJobs) : ', id(oldJobs) - print 'id(newJobs) : ', id(newJobs) - """ + # print 'self.last_cluster_state', self.last_cluster_state + # print 'newClusterState', newClusterState + # if self.last_cluster_state: + # print 'self.last_cluster_state.jobs_state', self.last_cluster_state.jobs_state + # print 'newClusterState.jobs_state', newClusterState.jobs_state + # print 'id(self.last_cluster_state) : ', id(self.last_cluster_state) + # print 'id(newClusterState) : ', id(newClusterState) + # print 'len(oldJobs) : ', len(oldJobs) + # print 'len(newJobs) : ', len(newJobs) + # print 'id(oldJobs) : ', id(oldJobs) + # print 'id(newJobs) : ', id(newJobs) for newJob in newJobs.values(): # logDebug('DecoupledSlotAllocator::jobsStateHasChanged newJob id=%s' % newJob.getId().asStr()) if newJob.getId() in oldJobs: @@ -112,36 +114,36 @@ class DecoupledSlotAllocator(SlotAllocator): del oldJobsOnly[newJob.getId()] else: # ah ... a new job has arrived - logInfo('A new job (jobId =%s) has been detected ' % newJob.getId().asStr()) + log_info('A new job (jobId =%s) has been detected ' % newJob.getId().asStr()) bJobsHaveChanged = True if len(oldJobsOnly) != 0: for oldJob in oldJobsOnly.values(): - logInfo('Job (jobId =%s) has finished' % oldJob.getId().asStr()) + log_info('Job (jobId =%s) has finished' % oldJob.getId().asStr()) # at least one old job has finished, freeing some slots bJobsHaveChanged = True return bJobsHaveChanged - def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState): + def get_machinesThatNeedWakeUp(self, pendingJobs, clusterState): machinesThatNeedWakeUp = {} - bJobsStateHasChanged = self.jobsStateHasChanged(clusterState) + bJobsStateHasChanged = self.jobsStateHasChanged(clusterState) # pylint: disable=no-value-for-parameter currentTime = time.time() # we do periodic checks to detect changes in cluster state that are not detected by jobsStateHasChanged # for example changes in the requirements, in the allocation policy, etc... bItsTimeForPeriodicCheck = False - if self.m_delayBetweenPeriodicChecks > 0: - bItsTimeForPeriodicCheck = (currentTime - self.m_lastCheckTime) > self.m_delayBetweenPeriodicChecks + if self.delay_between_periodic_checks > 0: + bItsTimeForPeriodicCheck = (currentTime - self.last_check_time) > self.delay_between_periodic_checks if bJobsStateHasChanged or bItsTimeForPeriodicCheck: if bJobsStateHasChanged: - logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed') + log_info('DecoupledSlotAllocator::get_machinesThatNeedWakeUp : waking up machines that are asleep because jobs state has changed') else: - logInfo('DecoupledSlotAllocator::getMachinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)') - for queueMachine in clusterState.getJobsState().getQueueMachines().values(): - if queueMachine.getMachineName() in clusterState.getMachines(): + log_info('DecoupledSlotAllocator::get_machinesThatNeedWakeUp : waking up machines that are asleep for periodic check (to be sure pending jobs get a chance to start)') + for queueMachine in clusterState.get_jobs_state().get_queue_machines().values(): + if queueMachine.get_machine_name() in clusterState.get_machines(): # this means that the machine is under the cluster controller's control - machine = clusterState.getMachines()[queueMachine.getMachineName()] - if machine.getPowerState() == PowerState.SLEEP: - machinesThatNeedWakeUp[machine.getName()] = machine - self.m_lastCheckTime = currentTime - self.m_lastClusterState = copy.copy(clusterState) - # print 'self.m_lastClusterState', self.m_lastClusterState + machine = clusterState.get_machines()[queueMachine.get_machine_name()] + if machine.get_power_state() == PowerState.SLEEP: + machinesThatNeedWakeUp[machine.get_name()] = machine + self.last_check_time = currentTime + self.last_cluster_state = copy.copy(clusterState) + # print 'self.last_cluster_state', self.last_cluster_state return machinesThatNeedWakeUp diff --git a/cocluto/ClusterController/SunGridEngine.py b/cocluto/ClusterController/SunGridEngine.py index 56b0be4..851fe31 100644 --- a/cocluto/ClusterController/SunGridEngine.py +++ b/cocluto/ClusterController/SunGridEngine.py @@ -1,58 +1,58 @@ import time -from Util import executeProgram -from QstatParser import QstatParser -from Log import logDebug, logWarning +from .Util import execute_program +from .QstatParser import QstatParser +from .Log import logDebug, log_warning class SunGridEngine: - def getCurrentJobsState(self): + def get_current_job_state(self): bBUG_00000009_IS_STILL_ALIVE = True if bBUG_00000009_IS_STILL_ALIVE: logDebug('Querying the current state of jobs') - returnCode = -1 - delayBetweenAttemps = 5 # in seconds - while returnCode != 0: + return_code = -1 + delay_between_attempts = 5 # in seconds + while return_code != 0: command = ['qstat', '-f', '-u', '*'] - (returnCode, qstatOutput, stderr) = executeProgram(command) - if returnCode != 0: - logWarning('command "%s" failed (returnCode = %d, stdout="%s", stderr="%s"). Retrying in %d seconds' % (' '.join(command), returnCode, qstatOutput, stderr, delayBetweenAttemps)) - time.sleep(delayBetweenAttemps) + (return_code, qstat_output, stderr) = execute_program(command) + if return_code != 0: + log_warning('command "%s" failed (returnCode = %d, stdout="%s", stderr="%s"). Retrying in %d seconds' % (' '.join(command), return_code, qstat_output, stderr, delay_between_attempts)) + time.sleep(delay_between_attempts) if bBUG_00000009_IS_STILL_ALIVE: logDebug('Just got current state of jobs') - jobsState = QstatParser().parseQstatOutput(qstatOutput) - jobsState.setTime(time.time()) + jobs_state = QstatParser().parse_qstat_output(qstat_output) + jobs_state.set_time(time.time()) # read the requirements for pending jobs (which parallel environment, which queue, which architecture) from sge if False: # no need for job details at the moment and since it's very slow, it's been disabled - for unused_jobId, job in jobsState.getPendingJobs().items(): - (returnCode, stdout, stderr) = executeProgram(['qstat', '-j', job.getId().asStr()]) - assert returnCode != 0, 'prout' - QstatParser().parseJobDetails(stdout, job) + for unused_jobId, job in jobs_state.get_pending_jobs().items(): + (return_code, stdout, stderr) = execute_program(['qstat', '-j', job.getId().asStr()]) + assert return_code != 0, 'prout' + QstatParser().parse_job_details(stdout, job) - return jobsState + return jobs_state - def setQueueInstanceActivation(self, strQueueInstanceName, bEnable): + def set_queue_instance_activation(self, queue_instance_name: str, enable: bool): argument = 'd' - if bEnable: + if enable: argument = 'e' bBUG_00000269_IS_STILL_ALIVE = True # for some reason, qmod -d (and maybe any sge command) could fail with error: commlib error: can't connect to service (Address already in use) - delayBetweenAttemps = 5 # in seconds + delay_between_attempts = 5 # in seconds while True: - errorCode, unused_stdout, unused_stderr = executeProgram(['qmod', '-' + argument, strQueueInstanceName]) + error_code, unused_stdout, unused_stderr = execute_program(['qmod', '-' + argument, queue_instance_name]) if bBUG_00000269_IS_STILL_ALIVE: # if the command failed, try again - if errorCode == 0: + if error_code == 0: break - time.sleep(delayBetweenAttemps) + time.sleep(delay_between_attempts) else: break - return (errorCode == 0) + return (error_code == 0) - def queueIsEmpty(self, strMachineName): - (returnCode, qstatOutput, unused_stderr) = executeProgram(['qstat', '-f', '-u', '*']) + def queue_is_empty(self, machine_name: str): + (returnCode, qstat_output, unused_stderr) = execute_program(['qstat', '-f', '-u', '*']) assert returnCode == 0 - jobsState = QstatParser().parseQstatOutput(qstatOutput) - jobs = jobsState.getJobsOnMachine(strMachineName) + jobs_state = QstatParser().parse_qstat_output(qstat_output) + jobs = jobs_state.get_jobs_on_machine(machine_name) return (len(jobs) == 0) diff --git a/cocluto/ClusterController/TestBug00000003.py b/cocluto/ClusterController/TestBug00000003.py index f1fd4b7..2b10d2c 100755 --- a/cocluto/ClusterController/TestBug00000003.py +++ b/cocluto/ClusterController/TestBug00000003.py @@ -1,24 +1,21 @@ #!/usr/bin/env python -import sys -sys.path.insert(0, '..') -from Log import logInfo -import Util -from PowerState import PowerState -from HTMLParser import HTMLParser +from .Log import log_info +from .Util import get_power_state, blocking_put_machine_to_sleep, blocking_wake_up_machine, execute_command, execute_ipmi_command +from .PowerState import PowerState def Test0000(): - logInfo('Testing bug 00000003 if a series of wake up, goto sleep can shutdown a machine') - strTargetMachineName = 'simpatix12' - ePowerState = Util.getPowerState(strTargetMachineName) + log_info('Testing bug 00000003 if a series of wake up, goto sleep can shutdown a machine') + strTarget_machine_name = 'simpatix12' + ePowerState = get_power_state(strTarget_machine_name) while True: if ePowerState == PowerState.ON: - bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName) + bSuccess = blocking_put_machine_to_sleep(strTarget_machine_name) assert bSuccess - bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName) + bSuccess = blocking_put_machine_to_sleep(strTarget_machine_name) ePowerState = PowerState.SLEEP elif ePowerState == PowerState.SLEEP: - bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + bSuccess = blocking_wake_up_machine(strTarget_machine_name) assert bSuccess ePowerState = PowerState.ON else: @@ -26,30 +23,30 @@ def Test0000(): def Test0001(): - logInfo('Testing bug 00000003 : could it be caused by a sleep and a power on at the same tim ?') - strTargetMachineName = 'simpatix12' - ePowerState = Util.getPowerState(strTargetMachineName) + log_info('Testing bug 00000003 : could it be caused by a sleep and a power on at the same tim ?') + strTarget_machine_name = 'simpatix12' + ePowerState = get_power_state(strTarget_machine_name) if ePowerState == PowerState.SLEEP: - bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + bSuccess = blocking_wake_up_machine(strTarget_machine_name) assert bSuccess ePowerState = PowerState.ON assert ePowerState == PowerState.ON - Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName) - bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + execute_command("ssh %s 'pmset sleepnow'" % strTarget_machine_name) + bSuccess = blocking_wake_up_machine(strTarget_machine_name) assert bSuccess def Test0002(): - logInfo('Testing bug 00000003 : could it be caused by a power on quickly followed by a sleep ?') - strTargetMachineName = 'simpatix12' - ePowerState = Util.getPowerState(strTargetMachineName) + log_info('Testing bug 00000003 : could it be caused by a power on quickly followed by a sleep ?') + strTarget_machine_name = 'simpatix12' + ePowerState = get_power_state(strTarget_machine_name) if ePowerState == PowerState.ON: - bSuccess = Util.blockingWakeUpMachine(strTargetMachineName) + bSuccess = blocking_wake_up_machine(strTarget_machine_name) assert bSuccess ePowerState = PowerState.SLEEP assert ePowerState == PowerState.SLEEP - Util.executeIpmiCommand(strTargetMachineName, 'chassis power on') - Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName) + execute_ipmi_command(strTarget_machine_name, 'chassis power on') + execute_command("ssh %s 'pmset sleepnow'" % strTarget_machine_name) if __name__ == '__main__': diff --git a/cocluto/ClusterController/Util.py b/cocluto/ClusterController/Util.py index f71b013..d1bf9b8 100644 --- a/cocluto/ClusterController/Util.py +++ b/cocluto/ClusterController/Util.py @@ -1,80 +1,81 @@ -# import .Util -# import ..SimpaDbUtil -from .Log import logDebug, logInfo, logWarning, logError -from .PowerState import PowerState, PowerStateToStr import re import io import os import traceback import sys +import time +from ..Util import execute_program as exe_prog +from ..Util import execute_command as exe_comm +from ..Util import send_text_mail +from ..SimpaDbUtil import getLightOutManagementIpAddress, is_machine_responding +from .Log import logDebug, log_info, log_warning, logError +from .PowerState import PowerState, PowerStateToStr -def executeProgram(astrArguments): +def execute_program(astrArguments): bBUG_00000008_IS_STILL_ACTIVE = True if bBUG_00000008_IS_STILL_ACTIVE: - logDebug('executeProgram : program = [%s]' % (','.join(astrArguments))) - (returnCode, stdout, stderr) = Lib.Util.executeProgram(astrArguments) + logDebug('execute_program : program = [%s]' % (','.join(astrArguments))) + (returnCode, stdout, stderr) = exe_prog(astrArguments) if bBUG_00000008_IS_STILL_ACTIVE: - logDebug('executeCommand : return code of [%s] = %d' % (','.join(astrArguments), returnCode)) + logDebug('execute_command : return code of [%s] = %d' % (','.join(astrArguments), returnCode)) # for debugging purpose, log info in case the command failed if returnCode != 0: - logDebug('executeCommand : return code of [%s] = %d' % (','.join(astrArguments), returnCode)) - logDebug('executeCommand : stdout of [%s] = %s' % (','.join(astrArguments), stdout)) - logDebug('executeCommand : stderr of [%s] = %s' % (','.join(astrArguments), stderr)) + logDebug('execute_command : return code of [%s] = %d' % (','.join(astrArguments), returnCode)) + logDebug('execute_command : stdout of [%s] = %s' % (','.join(astrArguments), stdout)) + logDebug('execute_command : stderr of [%s] = %s' % (','.join(astrArguments), stderr)) return (returnCode, stdout, stderr) -def executeCommand(command): - # logDebug('executeCommand : command = ' + command) - (returnCode, stdout, stderr) = Lib.Util.executeCommand(command) - # logDebug('executeCommand : return code of "'+command+'" = '+str(returnCode)) +def execute_command(command): + # logDebug('execute_command : command = ' + command) + (returnCode, stdout, stderr) = exe_comm(command) + # logDebug('execute_command : return code of "'+command+'" = '+str(returnCode)) return (returnCode, stdout, stderr) -def executeIpmiCommand(machineName, ipmiCommandArgs): - lomIpAddress = Lib.SimpaDbUtil.getLightOutManagementIpAddress(machineName) +def execute_ipmi_command(machineName, ipmiCommandArgs): + lomIpAddress = getLightOutManagementIpAddress(machineName) lomPasswordFilepath = '/usr/local/etc/LightOutManagementPassword.txt' astrProgram = ['ipmitool', '-U', 'admin', '-H', lomIpAddress, '-f', lomPasswordFilepath] astrProgram.extend(ipmiCommandArgs) - # print 'executeIpmiCommand' + # print 'execute_ipmi_command' # print astrProgram bBUG_00000005_IS_STILL_ACTIVE = True if bBUG_00000005_IS_STILL_ACTIVE: # bug 00000005 causes ipmitool to randomly fail for no apparent reason (two consecutive calls might give different errors, these errors not always being timeouts). Therefore we try and try again, until the command succeeds. If we don't do this, cluster controller keeps stopping because ipmi commands fail. The effect of this hack is that the UNPLUGGED power state is no longer detected; therefore, with this hack, cluster controller is expecting all machines to be plugged. bCommandSucceeded = False while not bCommandSucceeded: - (returnCode, stdout, stderr) = executeProgram(astrProgram) + (returnCode, stdout, stderr) = execute_program(astrProgram) if returnCode == 0: bCommandSucceeded = True else: - logWarning('the command "%s" failed. Retrying a bit later' % ' '.join(astrProgram)) + log_warning('the command "%s" failed. Retrying a bit later' % ' '.join(astrProgram)) time.sleep(5) # wait for 5 seconds before the next attempt, in order not to saturate activity else: - (returnCode, stdout, stderr) = executeProgram(astrProgram) - """ - sh-3.2# ipmitool -U admin -H 129.20.27.220 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' - Unabled to establish a session with the BMC. - Command failed due to insufficient resources for session (0xFFFEF901) - -> this error means that the number of active conections to the BMC has reached the maximum (usually 5). + (returnCode, stdout, stderr) = execute_program(astrProgram) + # sh-3.2# ipmitool -U admin -H 129.20.27.220 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' + # Unabled to establish a session with the BMC. + # Command failed due to insufficient resources for session (0xFFFEF901) + # -> this error means that the number of active conections to the BMC has reached the maximum (usually 5). - sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' - Unabled to establish a session with the BMC. - Command failed due to Unknown (0xFFFEF923) (0xFFFEF923) + # sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' + # Unabled to establish a session with the BMC. + # Command failed due to Unknown (0xFFFEF923) (0xFFFEF923) - sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' - Unabled to establish a session with the BMC. - Command failed due to Timeout (0xFFFEF9C3) - """ + # sh-3.2# ipmitool -U admin -H 129.20.27.212 -f /usr/local/etc/LightOutManagementPassword.txt sensor get 'ACPI State' + # Unabled to establish a session with the BMC. + # Command failed due to Timeout (0xFFFEF9C3) return (returnCode, stdout, stderr) -def getPowerState(machineName): +def get_power_state(machineName): ePowerState = PowerState.UNKNOWN bPowerStateRead = False iNumFailedAttempts = 0 while not bPowerStateRead: - (returnCode, stdout, stderr) = executeIpmiCommand(machineName, ['sensor', 'get', 'ACPI State']) + (returnCode, stdout, _stderr) = execute_ipmi_command(machineName, ['sensor', 'get', 'ACPI State']) if returnCode == 0: matchObj = re.search(r'\[(?PS[0-9][^\:]*)\:', stdout) bBUG_00000002_IS_STILL_ACTIVE = True @@ -83,7 +84,7 @@ def getPowerState(machineName): # the following warning has been commented out because it pollutes the logs and apparently # it's a 'feature' of all 4 core machines : if the machine is woken up using ipmitool, then # no power on event is logged ... - # logWarning('degenerate ipmitool output for machine %s (see bug 00000002). Assuming power in on because that''s what I noticed when I had the case.' % machineName) + # log_warning('degenerate ipmitool output for machine %s (see bug 00000002). Assuming power in on because that''s what I noticed when I had the case.' % machineName) return PowerState.ON else: assert matchObj @@ -103,31 +104,31 @@ def getPowerState(machineName): iMAX_NUM_ATTEMPTS = 5 iNumFailedAttempts += 1 if iNumFailedAttempts < iMAX_NUM_ATTEMPTS: - logWarning('failed to read the power state of %s. I\'ll try a again a bit later....' % machineName) + log_warning('failed to read the power state of %s. I\'ll try a again a bit later....' % machineName) time.sleep(5) else: - logWarning('failed to read the power state of %s too many times. I assume this machine is unplugged' % machineName) + log_warning('failed to read the power state of %s too many times. I assume this machine is unplugged' % machineName) ePowerState = PowerState.UNPLUGGED # too many attempts failed ... I guess it's because the machine is unplugged bPowerStateRead = True return ePowerState -def wakeUpMachine(machineName): +def wake_up_machine(machineName): """ this method seems more reliable than wake on lan (sometimes, sending wake on lan packet seems to have no effect) @return true on success, false otherwise @note I once had this method failing for no obvious reason.. maybe this command does not succeed if the machine is in a transition state """ - (returnCode, stdout, stderr) = executeIpmiCommand(machineName, ['chassis', 'power', 'on']) + (returnCode, _stdout, _stderr) = execute_ipmi_command(machineName, ['chassis', 'power', 'on']) bSuccess = (returnCode == 0) # this command can fail if the machine is manually unplugged for example return bSuccess -def blockingPutMachineToSleep(machineName): +def blocking_put_machine_to_sleep(machineName): """ @return true on success, false otherwise """ - logInfo('putting machine %s to sleep...' % machineName) + log_info('putting machine %s to sleep...' % machineName) iMaxNumAttempts = 5 bSuccess = False bBUG_239_IS_STILL_ALIVE = True @@ -135,36 +136,36 @@ def blockingPutMachineToSleep(machineName): # note : each sleep order is not actually succeeding (god knows why). Therefore, we need to try again and again. while not bSuccess: # note : pmset must be executed as root - (returnCode, stdout, stderr) = executeProgram(['ssh', machineName, 'pmset sleepnow']) + (_returnCode, _stdout, _stderr) = execute_program(['ssh', machineName, 'pmset sleepnow']) # check if the machine actually went to sleep iMaxGoToSleepDuration = 30 # in seconds iDelay = 0 while iDelay < iMaxGoToSleepDuration: time.sleep(5) iDelay += 5 - ePowerState = getPowerState(machineName) + ePowerState = get_power_state(machineName) if ePowerState == PowerState.SLEEP: - logInfo('machine %s is now sleeping (put to sleep succeeded)' % machineName) + log_info('machine %s is now sleeping (put to sleep succeeded)' % machineName) return True else: if ePowerState != PowerState.ON: - logWarning('unexpectedly, powerState of %s is %s' % (machineName, PowerStateToStr(ePowerState))) + log_warning('unexpectedly, powerState of %s is %s' % (machineName, PowerStateToStr(ePowerState))) assert ePowerState == PowerState.ON iAttempt += 1 if iAttempt > iMaxNumAttempts: if bBUG_239_IS_STILL_ALIVE: - logWarning('the attempt to put %s to sleep failed too many times (probably because of bug 239 (machine is in a weird state : power on but no ssh possible) ?)... giving up. ' % (machineName)) + log_warning('the attempt to put %s to sleep failed too many times (probably because of bug 239 (machine is in a weird state : power on but no ssh possible) ?)... giving up. ' % (machineName)) return False else: - logWarning('the attempt to put %s to sleep failed too many times... giving up' % (machineName)) + log_warning('the attempt to put %s to sleep failed too many times... giving up' % (machineName)) return False else: - logWarning('the attempt to put %s to sleep failed... trying again' % (machineName)) + log_warning('the attempt to put %s to sleep failed... trying again' % (machineName)) return True -def blockingWakeUpMachine(machineName): - logInfo('waking up machine %s...' % machineName) +def blocking_wake_up_machine(machineName): + log_info('waking up machine %s...' % machineName) numAttempts = 0 bWakeUpFailed = True while bWakeUpFailed: # try more than once because sometimes for an unknown reason, the wake up order is ignored by the machine ... to be investigated @@ -172,45 +173,45 @@ def blockingWakeUpMachine(machineName): iNumWakeUpAttempts = 0 bWakeUpMachineSucceeded = False while not bWakeUpMachineSucceeded: - bWakeUpMachineSucceeded = wakeUpMachine(machineName) + bWakeUpMachineSucceeded = wake_up_machine(machineName) iNumWakeUpAttempts += 1 # the previous command can fail if the machine is already in a transition # in that case we try sevral times bevire giving up if not bWakeUpMachineSucceeded: if iNumWakeUpAttempts < iMaxNumWakeUpAttempts: iDelay = 5 - logWarning('wake up attempt %d of %s failed... I\'ll try again in %d seconds' % (iNumWakeUpAttempts, machineName, iDelay)) + log_warning('wake up attempt %d of %s failed... I\'ll try again in %d seconds' % (iNumWakeUpAttempts, machineName, iDelay)) time.sleep(iDelay) else: - logWarning('wake up attempt %d of %s failed too many times... giving up' % (iNumWakeUpAttempts, machineName)) + log_warning('wake up attempt %d of %s failed too many times... giving up' % (iNumWakeUpAttempts, machineName)) return False # couldn't wake up to machine for whatever reason bWakeUpFailed = False # wait until the machine is operational WAKEUPTIMEOUT = 5 * 60 # max number of seconds allowed for a machine to be alive after a wakeup request wakeUpToAliveDuration = 0 - while not Lib.SimpaDbUtil.isMachineResponding(machineName): + while not is_machine_responding(machineName): time.sleep(5) wakeUpToAliveDuration += 5 if wakeUpToAliveDuration > WAKEUPTIMEOUT: # the wake up failed for whatever reason (power state changed manually ? wake up order got lost ?) - logWarning('%s took too long (more than %d seconds) to respond after a successful wakeup request.' % (machineName, WAKEUPTIMEOUT)) + log_warning('%s took too long (more than %d seconds) to respond after a successful wakeup request.' % (machineName, WAKEUPTIMEOUT)) bWakeUpFailed = True break if bWakeUpFailed: numAttempts += 1 if numAttempts >= 2: - logWarning('giving up waking up %s because the wake up request succeeded but the machine never actually came alive (and this too many times)' % (machineName)) + log_warning('giving up waking up %s because the wake up request succeeded but the machine never actually came alive (and this too many times)' % (machineName)) return False # power state changed manually ? else: - logWarning('attempting to wake up %s one more time' % (machineName)) + log_warning('attempting to wake up %s one more time' % (machineName)) else: # wake up completed - logInfo('Waking up of machine %s completed successfully' % machineName) + log_info('Waking up of machine %s completed successfully' % machineName) return True -def onException(exception): +def on_exception(exception): sys.stdout.flush() strExceptionType = type(exception) strMessage = 'exception %s : %s\n' % (strExceptionType, exception.message) @@ -224,11 +225,10 @@ def onException(exception): try: # I had the case (see bugzilla 234) where an assert failed in a child thread, but the main process kept going. I suspect that it was caused - # by a failure of sendTextMail... that's why I've embedded the sendmail inside a try except block, so that is this operation fails, then the + # by a failure of send_text_mail... that's why I've embedded the sendmail inside a try except block, so that is this operation fails, then the # kill of the main process is still executed. - Lib.Util.sendTextMail('ClusterController ', 'guillaume.raffy@univ-rennes1.fr', 'ClusterController has stopped because of an exception', strMessage) + send_text_mail('ClusterController ', 'guillaume.raffy@univ-rennes1.fr', 'ClusterController has stopped because of an exception', strMessage) except BaseException: logError("Could not send the email to notify the administrator that cluster controller failed") - pass - executeCommand('kill -9 %d' % os.getpid()) # stop other threads immediately + execute_command('kill -9 %d' % os.getpid()) # stop other threads immediately exit() diff --git a/cocluto/ClusterController/WebServer.py b/cocluto/ClusterController/WebServer.py index 7862850..1f5738d 100644 --- a/cocluto/ClusterController/WebServer.py +++ b/cocluto/ClusterController/WebServer.py @@ -1,109 +1,109 @@ -#Copyright Jon Berg , turtlemeat.com - -import string,cgi,time +# Copyright Jon Berg , turtlemeat.com +import cgi +import time from os import curdir, sep -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer # pylint:disable=import-error import threading -import Util -#import pri -from urlparse import urlparse, parse_qs +# import pri +from urlparse import urlparse, parse_qs # pylint:disable=import-error import xml.dom.minidom -#>>> url = 'http://example.com/?foo=bar&one=1' -#>>> parse_qs(urlparse(url).query) -#{'foo': ['bar'], 'one': ['1']} +from .Util import on_exception +# >>> url = 'http://example.com/?foo=bar&one=1' +# >>> parse_qs(urlparse(url).query) +# {'foo': ['bar'], 'one': ['1']} class MyHandler(BaseHTTPRequestHandler): def do_GET(self): try: - paramsDict=parse_qs(urlparse(self.path).query) + paramsDict = parse_qs(urlparse(self.path).query) if self.path.endswith(".html"): - f = open(curdir + sep + self.path) #self.path has /test.html -#note that this potentially makes every file on your computer readable by the internet + f = open(curdir + sep + self.path, encoding='utf8') # self.path has /test.html + # note that this potentially makes every file on your computer readable by the internet self.send_response(200) - self.send_header('Content-type', 'text/html') + self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(f.read()) f.close() return - if self.path.endswith(".esp"): #our dynamic content + if self.path.endswith(".esp"): # our dynamic content self.send_response(200) - self.send_header('Content-type', 'text/html') + self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write("hey, today is the" + str(time.localtime()[7])) self.wfile.write(" day in the year " + str(time.localtime()[0])) return - if self.path.endswith("ShowControlledMachines"): #http://simpatix10.univ-rennes1.fr:8080/ShowControlledMachines + if self.path.endswith("ShowControlledMachines"): # http://simpatix10.univ-rennes1.fr:8080/ShowControlledMachines self.send_response(200) - self.send_header('Content-type', 'text/xml') + self.send_header('Content-type', 'text/xml') self.end_headers() - + # Create the minidom document doc = xml.dom.minidom.Document() - + # Create the root element controlledMachinesElement = doc.createElement("ControlledMachines") doc.appendChild(controlledMachinesElement) - for machine in self.server.m_clusterController.m_clusterStatus.m_clusterNodes.values(): + for machine in self.server.cluster_controller.cluster_status.cluster_nodes.values(): # Create the main element controlledMachineElement = doc.createElement("Machine") - controlledMachineElement.setAttribute("name", machine.getName()) + controlledMachineElement.setAttribute("name", machine.get_name()) controlledMachinesElement.appendChild(controlledMachineElement) # Print our newly created XML self.wfile.write(doc.toprettyxml(indent=" ")) return - if urlparse(self.path).path == '/SetControlOnMachine': #http://simpatix10.univ-rennes1.fr:8080/SetControlOnMachine?machineName=simpatix30&control=1 + if urlparse(self.path).path == '/SetControlOnMachine': # http://simpatix10.univ-rennes1.fr:8080/SetControlOnMachine?machineName=simpatix30&control=1 machineName = paramsDict['machineName'][0] bControl = (paramsDict['control'][0] == '1') - self.server.m_clusterController.setControlOnMachine(machineName, bControl) + self.server.cluster_controller.set_control_on_machine(machineName, bControl) self.send_response(200) - self.send_header('Content-type', 'text/html') + self.send_header('Content-type', 'text/html') self.end_headers() - if bControl == True: - self.wfile.write("%s is now controlled by ClusterController" % machineName) + if bControl is True: + self.wfile.write("%s is now controlled by ClusterController" % machineName) else: - self.wfile.write("%s is no longer controlled by ClusterController" % machineName) + self.wfile.write("%s is no longer controlled by ClusterController" % machineName) return - return - except IOError: - self.send_error(404,'File Not Found: %s' % self.path) - + self.send_error(404, 'File Not Found: %s' % self.path) def do_POST(self): - global rootnode try: ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) if ctype == 'multipart/form-data': - query=cgi.parse_multipart(self.rfile, pdict) + query = cgi.parse_multipart(self.rfile, pdict) self.send_response(301) - + self.end_headers() upfilecontent = query.get('upfile') - print "filecontent", upfilecontent[0] - self.wfile.write("POST OK.

"); - self.wfile.write(upfilecontent[0]); - - except : + print("filecontent", upfilecontent[0]) + self.wfile.write("POST OK.

") + self.wfile.write(upfilecontent[0]) + + except BaseException: pass -class WebServerThread( threading.Thread ): - def __init__( self, clusterController ): - threading.Thread.__init__(self) - #self.m_clusterController = clusterController - self.m_bStop = False - self.m_httpServer = HTTPServer(('', 8080), MyHandler) - self.m_httpServer.m_clusterController = clusterController - def run( self ): - try: - while not self.m_bStop: - self.m_httpServer.handle_request() - #self.m_httpServer.serve_forever() - except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) - self.m_httpServer.socket.close() - Util.onException(exception) +class WebServerThread(threading.Thread): + stop: bool + http_server: HTTPServer + + def __init__(self, clusterController): + threading.Thread.__init__(self) + # self.cluster_controller = clusterController + self.stop = False + self.http_server = HTTPServer(('', 8080), MyHandler) + self.http_server.cluster_controller = clusterController + + def run(self): + try: + while not self.stop: + self.http_server.handle_request() + # self.http_server.serve_forever() + except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) + self.http_server.socket.close() + on_exception(exception) diff --git a/cocluto/Ipmi/ClusterNodeSensorsReadings.py b/cocluto/Ipmi/ClusterNodeSensorsReadings.py index 1cc988d..6628d8b 100644 --- a/cocluto/Ipmi/ClusterNodeSensorsReadings.py +++ b/cocluto/Ipmi/ClusterNodeSensorsReadings.py @@ -12,35 +12,35 @@ class ClusterNodeSensorsReadings: POWERSTATE_SLEEP=3 """ def __init__(self, clusterNodeName): - self.m_clusterNodeName = clusterNodeName - self.m_sensors = {} - # self.m_powerState = ClusterNodeStatus.POWERSTATE_UNKNOWN + self.cluster_node_name = clusterNodeName + self.sensors = {} + # self.power_state = ClusterNodeStatus.POWERSTATE_UNKNOWN return def addSensor(self, sensor): - self.m_sensors[sensor.m_name] = sensor + self.sensors[sensor.name] = sensor def dump(self): - for key, sensor in self.m_sensors.items(): + for key, sensor in self.sensors.items(): sensor.dump() return - # def getPowerState(self): - # return self.m_powerState + # def get_power_state(self): + # return self.power_state def getLowestTemperature(self): # log('ClusterNodeSensorsReadings::getLowestTemperature : start') lowestTemperature = 0.0 lowestTemperatureIsDefined = False - for key, sensor in self.m_sensors.items(): + for key, sensor in self.sensors.items(): # log('ClusterNodeSensorsReadings::getLowestTemperature : start') if sensor.typeName() == 'Temperature': - sensor.m_temperature + sensor.temperature if lowestTemperatureIsDefined: - if sensor.m_temperature < lowestTemperature: - lowestTemperature = sensor.m_temperature + if sensor.temperature < lowestTemperature: + lowestTemperature = sensor.temperature else: - lowestTemperature = sensor.m_temperature + lowestTemperature = sensor.temperature lowestTemperatureIsDefined = True assert lowestTemperatureIsDefined # log('ClusterNodeSensorsReadings::getLowestTemperature : end') diff --git a/cocluto/Ipmi/IpmiTool202Parser.py b/cocluto/Ipmi/IpmiTool202Parser.py index 1882744..d033aca 100644 --- a/cocluto/Ipmi/IpmiTool202Parser.py +++ b/cocluto/Ipmi/IpmiTool202Parser.py @@ -31,12 +31,12 @@ class IpmiTool202Parser: rpms = self.parseFanSensorOutput(f) if temperature is not None: sensor = FanSensor(sensorName) - sensor.m_rpms = rpms + sensor.rpms = rpms elif sensorType == 'Temperature': temperature = self.parseTemperatureSensorOutput(f) if temperature is not None: sensor = TemperatureSensor(sensorName) - sensor.m_temperature = temperature + sensor.temperature = temperature else: # ignoring other sensors sensor = None diff --git a/cocluto/Ipmi/IpmiTool218Parser.py b/cocluto/Ipmi/IpmiTool218Parser.py index d399c2b..548dadc 100644 --- a/cocluto/Ipmi/IpmiTool218Parser.py +++ b/cocluto/Ipmi/IpmiTool218Parser.py @@ -22,10 +22,10 @@ class IpmiTool218Parser: sensor = None if sensorUnit == 'degrees C': sensor = TemperatureSensor(sensorName) - sensor.m_temperature = float(sensorValue) + sensor.temperature = float(sensorValue) elif sensorUnit == 'RPM': sensor = FanSensor(sensorName) - sensor.m_rpms = float(sensorValue) + sensor.rpms = float(sensorValue) else: None if sensor: diff --git a/cocluto/Ipmi/Sensor.py b/cocluto/Ipmi/Sensor.py index a631d03..521ceac 100644 --- a/cocluto/Ipmi/Sensor.py +++ b/cocluto/Ipmi/Sensor.py @@ -1,23 +1,40 @@ +from typing import Optional + + class Sensor: + def __init__(self, sensorName): - self.m_name = sensorName - self.m_isValid = True # false if this sensor is not actually present on the target machine + self.name = sensorName + self.is_valid = True # false if this sensor is not actually present on the target machine return + def dump(self): - print self.m_name - + print(self.name) + + class FanSensor(Sensor): + rpms: Optional[float] + def __init__(self, sensorName): Sensor.__init__(self, sensorName) + self.rpms = None + def dump(self): - print 'Fan \'', self.m_name, '\' rpm=',self.m_rpms + print('Fan \'', self.name, '\' rpm=', self.rpms) + def typeName(self): return 'Fan' + class TemperatureSensor(Sensor): + temperature: Optional[float] + def __init__(self, sensorName): Sensor.__init__(self, sensorName) + self.temperature = None + def dump(self): - print 'Temperature \'', self.m_name, '\' temperature=',self.m_temperature + print('Temperature \'', self.name, '\' temperature=', self.temperature) + def typeName(self): return 'Temperature' diff --git a/cocluto/SimpaDbUtil.py b/cocluto/SimpaDbUtil.py index 513520c..217cf53 100644 --- a/cocluto/SimpaDbUtil.py +++ b/cocluto/SimpaDbUtil.py @@ -8,31 +8,31 @@ else: import re from .wol import wake_on_lan import os -from .Util import executeProgram, executeCommand, log +from .Util import execute_program, execute_command, log import abc import sqlite3 from .mysql2sqlite import mysql_to_sqlite -def isMachineResponding(machineName): - (returnCode, stdout, stderr) = executeProgram(['ping', '-o', '-t', '1', machineName]) - # log( 'isMachineResponding : result of command %s : %d' % (command, returnCode) ) +def is_machine_responding(machineName): + (returnCode, stdout, stderr) = execute_program(['ping', '-o', '-t', '1', machineName]) + # log( 'is_machine_responding : result of command %s : %d' % (command, returnCode) ) if returnCode == 0: return True else: - bMachineNameIsNotKnown = (returnCode == 68) + # bMachineNameIsNotKnown = (returnCode == 68) bMachineIsNotResponding = (returnCode == 2) if bMachineIsNotResponding is False: bBUG_00000004_IS_STILL_ALIVE = True if bBUG_00000004_IS_STILL_ALIVE is True and returnCode == 142: - log('isMachineResponding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName)) + log('is_machine_responding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName)) # don't stop the program until we understand bug00000004 elif bBUG_00000004_IS_STILL_ALIVE is True and returnCode == -14: # I had this error code on 07/09/2009 20:38 but I don't know yet what that means - log('isMachineResponding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName)) + log('is_machine_responding : bug00000004 Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName)) # don't stop the program until we understand bug00000004 else: - log('isMachineResponding : Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName)) + log('is_machine_responding : Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName)) assert False return False @@ -46,7 +46,6 @@ class ISqlDatabaseBackend(object): """ :param str sql_query: the sql query to perform """ - pass class RemoteMysqlDb(ISqlDatabaseBackend): @@ -70,7 +69,7 @@ class RemoteMysqlDb(ISqlDatabaseBackend): :param str sql_query: the sql query to perform """ self._conn.query(sql_query) - rows = conn.store_result() + rows = self._conn.store_result() return rows @@ -87,7 +86,7 @@ class SqlFile(ISqlDatabaseBackend): # - the file is stored on a solid state disk try: os.remove(sqlite_db_path) - except: + except BaseException: pass check_same_thread = False # this is to prevent the following error when run from apache/django : SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 139672342353664 and this is thread id 139672333960960 @@ -95,7 +94,7 @@ class SqlFile(ISqlDatabaseBackend): # If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption # I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file... self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread) - with open(str(self._sql_file_path), 'r') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path + with open(str(self._sql_file_path), 'r', encoding='utf8') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path sql = f.read() # watch out for built-in `str` # print(sql) self._cur = self._con.cursor() @@ -111,7 +110,6 @@ class SqlFile(ISqlDatabaseBackend): """ :param str sql_query: the sql query to perform """ - pass self._cur.execute(sql_query) rows = self._cur.fetchall() return rows @@ -196,7 +194,7 @@ def getLightOutManagementIpAddress(machineName): return ipAddress -def getClusterMachinesNames(): +def get_cluster_machines_names(): clusterMachinesNames = [] conn = MySQLdb.connect('simpatix10', 'simpadb_reader', '', 'simpadb') assert conn @@ -223,14 +221,12 @@ def machineSupportsIpmi(machineName): def putToSleep(machineName): # note : pmset must be executed as root - (returnCode, stdout, stderr) = executeCommand(['ssh', machineName, 'pmset sleepnow']) - """ - print returnCode - print 'stdout :' - print stdout - print 'stderr :' - print stderr - """ + (returnCode, stdout, _stderr) = execute_command(['ssh', machineName, 'pmset sleepnow']) + # print returnCode + # print 'stdout :' + # print stdout + # print 'stderr :' + # print stderr assert returnCode == 0 # check if the command succeeded by looking at the output (that's the only way I found) f = StringIO.StringIO(stdout) @@ -255,7 +251,7 @@ def isNonRespondingMachineSleeping(machineName): """ wakeUp(machineName) time.sleep(120) - if isMachineResponding(machineName): + if is_machine_responding(machineName): putToSleep(machineName) time.sleep(30) # allow a little time to make sure the machine is ready to receive other wake on lan messages return True @@ -264,11 +260,9 @@ def isNonRespondingMachineSleeping(machineName): if __name__ == '__main__': - """ - for i in range(30): - machineName = 'simpatix%d' % (i+10) - print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName)) - """ + # for i in range(30): + # machineName = 'simpatix%d' % (i+10) + # print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName)) wakeUp('simpatix21') # print putToSleep('simpatix13') # print isNonRespondingMachineSleeping('simpatix13') diff --git a/cocluto/Util.py b/cocluto/Util.py index 66b7b12..e5d57b4 100644 --- a/cocluto/Util.py +++ b/cocluto/Util.py @@ -1,6 +1,5 @@ import time import subprocess -import io import re import logging # from wol import * @@ -16,7 +15,7 @@ else: from email.mime.text import MIMEText -def sendTextMail(strFrom, to, strSubject, text): +def send_text_mail(strFrom, to, strSubject, text): # from = "SimpaCluster " mail = MIMEText(text) mail['From'] = strFrom @@ -30,12 +29,14 @@ def sendTextMail(strFrom, to, strSubject, text): class Error(Exception): + message: str + def __init__(self, strMessage): - self.m_strMessage = strMessage + self.message = strMessage def getHostName(): - (returnCode, stdout, stderr) = executeProgram(['hostname', '-s']) + (returnCode, stdout, stderr) = execute_program(['hostname', '-s']) if returnCode != 0: raise Error(stderr) strHostName = re.sub(r"\n", "", stdout) @@ -46,18 +47,18 @@ def log(message): print(time.asctime(time.localtime()) + ' : ' + message) -def executeProgram(astrArguments): - # log('executeProgram : program [%s]' % (','.join(astrArguments))) +def execute_program(astrArguments): + # log('execute_program : program [%s]' % (','.join(astrArguments))) popen = subprocess.Popen(astrArguments, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # bufsize=1 seems to prevent deadlocks that happen 50% the time stdout, stderr = popen.communicate() # popen.wait() result = (popen.returncode, stdout.decode(), stderr) - # log('executeProgram : command %s popen.pid = %d' % (astrArguments[0], popen.pid)) + # log('execute_program : command %s popen.pid = %d' % (astrArguments[0], popen.pid)) # os.kill(popen.pid, signal.SIGTERM) return result -def executeCommand(command): +def execute_command(command): """ executes the shell command such as 'set x=1; myprog $x' """ @@ -69,25 +70,25 @@ def executeCommand(command): return result -def executeCommandOn(target_machine_fqdn: str, command: str, user: str = None): +def execute_commandOn(target_machine_fqdn: str, command: str, user: str = None): """ execute command on a local or remote machine (using ssh then) :param str user: if not None, the user that should be used to execute the command (instead of the current user) """ - logging.debug("executing %s on %s as %s" % (command, target_machine_fqdn, user)) + logging.debug("executing %s on %s as %s", command, target_machine_fqdn, user) if getHostName() == target_machine_fqdn.split('.')[0]: if user is not None: # su -c "ls -l /tmp" graffy - result = executeCommand("su -c '%s' %s" % (command, user)) + result = execute_command("su -c '%s' %s" % (command, user)) else: - result = executeCommand(command) + result = execute_command(command) else: if user is not None: target = '%s@%s' % (user, target_machine_fqdn) else: target = target_machine_fqdn - result = executeProgram(['ssh', target, "%s" % command]) - logging.debug("finished executing %s on %s as %s" % (command, target_machine_fqdn, user)) + result = execute_program(['ssh', target, "%s" % command]) + logging.debug("finished executing %s on %s as %s", command, target_machine_fqdn, user) return result @@ -97,16 +98,16 @@ def getUpsStatus(): def __init__(self): HTMLParser.__init__(self) - self.TokenList = [] + self.token_list = [] def handle_data(self, data): data = data.strip() if data and len(data) > 0: - self.TokenList.append(data) + self.token_list.append(data) # print data - def GetTokenList(self): - return self.TokenList + def get_token_list(self): + return self.token_list from urllib.request import urlopen try: @@ -114,21 +115,20 @@ def getUpsStatus(): f = urlopen(url) res = f.read() f.close() - except: + except BaseException: print("bad read") return h = MyHTMLParser() h.feed(res) - tokensList = h.GetTokenList() # noqa:F841 + _tokensList = h.get_token_list() # noqa:F841 + raise NotImplementedError('the implementation is not complete') if __name__ == '__main__': - from SimpaDbUtil import wakeUp - """ - for i in range(30): - machineName = 'simpatix%d' % (i+10) - print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName)) - """ + from .SimpaDbUtil import wakeUp + # for i in range(30): + # machineName = 'simpatix%d' % (i+10) + # print 'lom ip of %s is %s' % (machineName, getLightOutManagementIpAddress(machineName)) wakeUp('simpatix21') # print putToSleep('simpatix13') # print isNonRespondingMachineSleeping('simpatix13') diff --git a/test/test_cocluto.py b/test/test_cocluto.py index 675e498..1af62f8 100644 --- a/test/test_cocluto.py +++ b/test/test_cocluto.py @@ -20,7 +20,7 @@ class CoclutoTestCase(unittest.TestCase): qstat_output = file.read() # qstatParser = ClusterController.QstatParser() qstatParser = QstatParser() - job_state = qstatParser.parseQstatOutput(qstat_output, cluster_domain='ipr.univ-rennes1.fr') + job_state = qstatParser.parse_qstat_output(qstat_output, cluster_domain='ipr.univ-rennes1.fr') self.assertIsInstance(job_state, JobsState)