fixed pylint errors and cleaned up

work related to Bug 3315 - make simpaweb django app a packageable application
This commit is contained in:
Guillaume Raffy 2023-05-23 17:27:12 +02:00
parent 7a5d32dec0
commit 270304f58e
28 changed files with 2323 additions and 2293 deletions

View File

@ -2,50 +2,61 @@
import sys
sys.path.insert(0, '..')
import os
import MySQLdb
import threading
from Lib.Util import *
from Lib.SimpaDbUtil import *
import time
from ClusterStatus import ClusterStatus
from SlotAllocator import *
from Log import *
from ClusterNodeStatusUpdater import *
from SlotAllocator import DecoupledSlotAllocator
from Log import logDebug, logInfo
from ClusterNodeStatusUpdater import IWakeUpCompleteNotifier, ISleepCompleteNotifier
from SunGridEngine import SunGridEngine
import Util
from Util import log, onException
from WebServer import WebServerThread
from PowerState import PowerState
from HTMLParser import HTMLParser
VERSION = '1.18'
class MyHTMLParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.TokenList = []
def handle_data(self, data):
data = data.strip()
if data and len(data) > 0:
self.TokenList.append(data)
# print data
def GetTokenList(self):
return self.TokenList
class WakeUpCompleteNotifier(IWakeUpCompleteNotifier):
def __init__(self, machineName, clusterController):
self.m_machineName = machineName
self.m_clusterController = clusterController
def onWakeUpComplete(self):
logDebug('WakeUpCompleteNotifier::onWakeUpComplete : start')
self.m_clusterController.onMachineWakeUpComplete(self.m_machineName)
class SleepCompleteNotifier(ISleepCompleteNotifier):
def __init__(self, machineName, clusterController):
self.m_machineName = machineName
self.m_clusterController = clusterController
def onSleepComplete(self, bSleepSucceeded):
logDebug('SleepCompleteNotifier::onSleepComplete : start')
self.m_clusterController.onMachineSleepComplete(self.m_machineName, bSleepSucceeded)
def jouleToKwh(fEnergyInJoules):
"""
converts joules to kWH
@ -53,6 +64,7 @@ def jouleToKwh( fEnergyInJoules ):
# 1 kWh = 1000 * 3600 J
return fEnergyInJoules / (1000.0 * 3600.0)
class ClusterController:
"""
The cluster controller monitors the cluster's activity and has multiple purposes :
@ -86,7 +98,7 @@ class ClusterController:
return self.m_clusterStatus
def log(self, message):
print message
print(message)
def shutdownLeastImportantNode(self):
self.log("ClusterController::shutdownLeastImportantNode : start")
@ -144,7 +156,6 @@ class ClusterController:
while self.getNumPendingSleeps() > 0:
time.sleep(1)
def wakeUpMachinesForPendingJobs(self):
listOfMachinesThatNeedWakeUp = []
@ -186,23 +197,23 @@ class ClusterController:
def storeSessionInDatabase(self):
conn = MySQLdb.connect('simpatix10', 'clusterctrl', '', 'clustercontroller')
assert(conn)
assert conn
# retrieve the session id, as it's an auto_increment field
sqlCommand = "SELECT AUTO_INCREMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'clustercontroller' AND TABLE_NAME = 'sessions_desc'"
print sqlCommand
print(sqlCommand)
conn.query(sqlCommand)
r = conn.store_result()
iSessionId = r.fetch_row()[0][0]
# stores information about the session
sqlCommand = "INSERT INTO `sessions_desc` (`start_time`, end_time, `program_version`, `machine_name`, `pid`, num_controlled_machines) VALUES (NOW(), NOW(), '%s', 'simpatix10', %d, %d);" % (VERSION, os.getpid(), len(self.m_clusterStatus.m_clusterNodes))
print sqlCommand
print(sqlCommand)
conn.query(sqlCommand)
# initialize the energy savings table
sqlCommand = "INSERT INTO session_to_energy_savings (session_id, energy_savings_kwh) VALUES (%d,0.0);" % (iSessionId)
print sqlCommand
print(sqlCommand)
conn.query(sqlCommand)
conn.close()
@ -211,16 +222,16 @@ class ClusterController:
def updateSessionEnergyConsumptionInDatabase(self):
conn = MySQLdb.connect('simpatix10', 'root', '', 'clustercontroller')
assert(conn)
assert conn
# update energy savings for the current session
sqlCommand = "UPDATE session_to_energy_savings SET energy_savings_kwh=%f WHERE session_id=%d;" % (jouleToKwh(self.m_clusterStatus.getEnergySavings()), self.m_iSessionId)
print sqlCommand
print(sqlCommand)
conn.query(sqlCommand)
# update the end time of the current session
sqlCommand = "UPDATE sessions_desc SET end_time=NOW() WHERE session_id=%d;" % (self.m_iSessionId)
print sqlCommand
print(sqlCommand)
conn.query(sqlCommand)
conn.close()
@ -277,7 +288,7 @@ class ClusterController:
def storeClusterNodeStatus(clusterNodeStatus):
# conn = MySQLdb.connect('simpatix10', 'measures_writer', '', 'simpa_measurements')
conn = MySQLdb.connect('simpatix10', 'root', '', 'simpa_measurements')
assert(conn)
assert conn
# conn.query("""INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('titi', 2000, NOW());""")
'''
conn.query("""SELECT * FROM fan_rpm_logs""")
@ -288,16 +299,17 @@ def storeClusterNodeStatus( clusterNodeStatus ):
sensorId = clusterNodeStatus.m_clusterNodeName + '_' + sensor.m_name
if sensor.typeName() == 'Fan':
sqlCommand = """INSERT INTO `fan_rpm_logs` (`fan_id`, `rpm`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.m_rpms) + """, NOW());"""
print sqlCommand
print(sqlCommand)
conn.query(sqlCommand)
elif sensor.typeName() == 'Temperature':
sqlCommand = """INSERT INTO `temperature_logs` (`temp_sensor_id`, `temperature`, `date`) VALUES ('""" + sensorId + """', """ + str(sensor.m_temperature) + """, NOW());"""
print sqlCommand
print(sqlCommand)
conn.query(sqlCommand)
else:
assert(False)
assert False
conn.close()
if __name__ == '__main__':
# Lib.Util.sendTextMail('SimpaCluster <guillaume.raffy@univ-rennes1.fr>', 'guillaume.raffy@univ-rennes1.fr', 'mail subject', 'mail content')
try:
@ -310,5 +322,5 @@ if __name__ == '__main__':
# machineNameToMacAddress('simpatix10')
# except AssertionError, error:
# except KeyboardInterrupt, error:
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
Util.onException(exception)
except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
onException(exception)

View File

@ -1,10 +1,12 @@
import threading
from PowerState import *
from ClusterNodeStatusUpdater import *
from PowerState import PowerState, PowerStateToStr
from ClusterNodeStatusUpdater import ClusterNodeStatusUpdater
import Lib.Util
import Lib.SimpaDbUtil
from Log import logInfo, logWarning
from datetime import datetime
from datetime import *
class ClusterNode:
"""
@ -69,7 +71,7 @@ class ClusterNode:
self.m_machineStatusUpdater.m_bCheckPowerState = True
self.m_machineStatusUpdater.m_bCheckSensors = False
else:
assert( False )
assert False
def onNewPowerStateReading(self, powerState):
"""
@ -96,7 +98,7 @@ class ClusterNode:
elif ePowerState == PowerState.UNPLUGGED:
fCurrentIntensity = 0.0
else:
assert(False)
assert False
return fCurrentIntensity * fCurrentVoltage
def updateEnergyMeasurements(self):
@ -133,7 +135,7 @@ class ClusterNode:
def getQueueMachineName(self):
return self.getCluster().getJobsState().getQueueMachine(self.m_name).getName()
assert( self.m_queueName != None )
assert self.m_queueName is not None
return self.m_queueName
def getCluster(self):

View File

@ -2,26 +2,26 @@ import threading
import time
import Lib.Util
import Lib.SimpaDbUtil
import os
import traceback
import sys
from PowerState import *
from QstatParser import *
import Util
from PowerState import PowerState
from Log import logInfo, logDebug
from Util import blockingWakeUpMachine, blockingPutMachineToSleep, getPowerState, onException
class IWakeUpCompleteNotifier:
"""
interface for wakeup notifiers
"""
def onWakeUpComplete(self):
assert( False )
assert False
class ISleepCompleteNotifier:
"""
interface for sleep notifiers
"""
def onSleepComplete(self, bSleepSucceeded):
assert( False )
assert False
class IRequest:
GO_TO_SLEEP = 1
@ -38,7 +38,8 @@ class IRequest:
"""
processes this request
"""
assert( False ) # this method is abstract
assert False # this method is abstract
class WakeUpRequest(IRequest):
@ -47,15 +48,15 @@ class WakeUpRequest( IRequest ):
self.m_wakeUpNotifier = wakeUpNotifier
def process(self, clusterNodeStatusUpdater):
assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to wake up a machine that should always be on ?
assert clusterNodeStatusUpdater.m_bShouldAlwaysBeOn is False # are we attempting to wake up a machine that should always be on ?
logInfo('Handling wakeup request for %s' % clusterNodeStatusUpdater.getName())
bSuccess = blockingWakeUpMachine(clusterNodeStatusUpdater.getName())
assert( bSuccess )
assert bSuccess
# activate the associated machine queue
if clusterNodeStatusUpdater.setQueueActivation(True):
None # all is ok
pass # all is ok
else:
assert( False )
assert False
clusterNodeStatusUpdater.m_stateLock.acquire()
clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.ON)
clusterNodeStatusUpdater.m_stateLock.release()
@ -63,6 +64,7 @@ class WakeUpRequest( IRequest ):
logDebug('ClusterNodeStatusUpdater::run : Sending wakeup notification')
self.m_wakeUpNotifier.onWakeUpComplete()
class SleepRequest(IRequest):
def __init__(self, sleepCompleteNotifier):
@ -70,7 +72,7 @@ class SleepRequest( IRequest ):
self.m_sleepCompleteNotifier = sleepCompleteNotifier
def process(self, clusterNodeStatusUpdater):
assert( clusterNodeStatusUpdater.m_bShouldAlwaysBeOn == False ) # are we attempting to put a machine the should stay on to sleep ?
assert not clusterNodeStatusUpdater.m_bShouldAlwaysBeOn # are we attempting to put a machine the should stay on to sleep ?
logInfo('Handling sleep request for %s' % clusterNodeStatusUpdater.getName())
if clusterNodeStatusUpdater.setQueueActivation(False):
if clusterNodeStatusUpdater.queueIsEmpty():
@ -82,11 +84,11 @@ class SleepRequest( IRequest ):
if self.m_sleepCompleteNotifier:
self.m_sleepCompleteNotifier.onSleepComplete(True)
else:
assert( False )
assert False
else:
# reactivate the queue
if not clusterNodeStatusUpdater.setQueueActivation(True):
assert( False )
assert False
clusterNodeStatusUpdater.m_stateLock.acquire()
clusterNodeStatusUpdater.m_clusterNode.setPowerState(PowerState.ON) # this is necessary to reenable the various cyclic checks that were disabled on sleep request
clusterNodeStatusUpdater.m_stateLock.release()
@ -94,7 +96,8 @@ class SleepRequest( IRequest ):
if self.m_sleepCompleteNotifier:
self.m_sleepCompleteNotifier.onSleepComplete(False)
else:
assert( False )
assert False
class CheckPowerStateRequest(IRequest):
@ -102,12 +105,13 @@ class CheckPowerStateRequest( IRequest ):
IRequest.__init__(self, IRequest.CHECK_POWER_STATE)
def process(self, clusterNodeStatusUpdater):
powerState = Util.getPowerState( clusterNodeStatusUpdater.m_clusterNodeName )
powerState = getPowerState(clusterNodeStatusUpdater.m_clusterNodeName)
clusterNodeStatusUpdater.m_stateLock.acquire()
clusterNodeStatusUpdater.m_clusterNode.onNewPowerStateReading(powerState)
clusterNodeStatusUpdater.m_lastPowerStateCheckTime = time.time()
clusterNodeStatusUpdater.m_stateLock.release()
class ClusterNodeStatusUpdater(threading.Thread):
DELAY_BETWEEN_POWERSTATE_CHECKS = 5 * 60 # in seconds
@ -152,7 +156,7 @@ class ClusterNodeStatusUpdater( threading.Thread ):
while not self.m_bStop:
# handle the oldest request
request = self.popRequest()
if request != None :
if request is not None:
request.process(self)
# schedule a power state check if required
@ -164,15 +168,15 @@ class ClusterNodeStatusUpdater( threading.Thread ):
self.pushRequest(CheckPowerStateRequest())
time.sleep(1)
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
Util.onException(exception)
except BaseException as exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
onException(exception)
def requestSleep(self, sleepCompleteNotifier=None):
assert( self.m_bShouldAlwaysBeOn == False )
assert not self.m_bShouldAlwaysBeOn
self.pushRequest(SleepRequest(sleepCompleteNotifier))
def requestWakeUp(self, wakeUpNotifier=None):
assert( self.m_bShouldAlwaysBeOn == False )
assert self.m_bShouldAlwaysBeOn is False
self.pushRequest(WakeUpRequest(wakeUpNotifier))
def getQueueMachineName(self):

View File

@ -1,10 +1,13 @@
import threading
from JobsStateUpdater import *
from JobsStateUpdater import JobsStateUpdater
import Lib.Util
import Lib.SimpaDbUtil
from ClusterNode import *
from ClusterNode import ClusterNode
from Log import logInfo, logError
from PowerState import PowerState
import time
class ClusterStatus:
"""
The current state (jobs, sensors) of the cluster
@ -36,7 +39,6 @@ class ClusterStatus:
self.m_clusterNodes[nodeName] = clusterNode
return
def setControlOnMachine(self, machineName, bControl):
if bControl:
# add machineName under control of ClusterController
@ -92,13 +94,13 @@ class ClusterStatus:
return False
# log('ClusterStatus::isReady() : '+k+' is ready')
# assert(False)
if self.m_jobsState == None:
if self.m_jobsState is None:
logInfo('ClusterStatus::isReady : not ready because waiting for jobs state')
return False
return True
def getIdleMachines(self):
assert( self.isReady )
assert self.isReady
bBUG_00000009_IS_STILL_ALIVE = True
if bBUG_00000009_IS_STILL_ALIVE:
currentTime = time.time()
@ -106,7 +108,7 @@ class ClusterStatus:
fJobsStateAge = currentTime - self.m_jobsState.getTime()
if fJobsStateAge > fJOBS_STATE_MAX_ALLOWED_AGE:
logError('ClusterStatus::getIdleMachines : age of jobs state is too old (%f s). This is bug 00000009.' % (fJobsStateAge))
assert( False )
assert False
idleMachines = {}
for machineName, machine in self.m_clusterNodes.items():
if machine.getPowerState() == PowerState.ON:
@ -181,7 +183,7 @@ class ClusterStatus:
for machine in self.m_clusterNodes.values():
queueMachine = self.m_jobsState.getQueueMachine(machine.getName())
iNumUsedSlotsOnThisMachine = queueMachine.getNumSlots() - self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
assert(iNumUsedSlotsOnThisMachine >= 0)
assert iNumUsedSlotsOnThisMachine >= 0
iNumUsedSlots += iNumUsedSlotsOnThisMachine
self.m_lock.release()
return iNumUsedSlots
@ -205,5 +207,3 @@ class ClusterStatus:
iNumSleepingSlots += self.m_jobsState.getNumFreeSlotsOnQueueMachine(queueMachine)
self.m_lock.release()
return iNumSleepingSlots

View File

@ -9,9 +9,11 @@ class JobStateFlags:
ERROR = 64
SUSPENDED = 128
class ParallelEnvironment:
MPI = 1
class JobRequirements:
def __init__(self):
self.m_numSlots = None
@ -28,6 +30,7 @@ class JobId:
share the same sge job identifier. To uniquely define a job array element, we also use the task id.
"""
MAX_NUM_JOBS_IN_ARRAY = 1000000
def __init__(self, iJobId, iJobArrayElementId=None):
if iJobArrayElementId is not None:
assert iJobArrayElementId <= self.MAX_NUM_JOBS_IN_ARRAY
@ -54,7 +57,7 @@ class JobId:
return True
def isJobArrayElement(self):
return (self.m_iJobArrayElementId != None)
return (self.m_iJobArrayElementId is not None)
def getMainId(self):
return self.m_iJobId
@ -66,7 +69,6 @@ class JobId:
return strResult
class Job:
def __init__(self, jobId):
self.m_jobId = jobId
@ -78,51 +80,65 @@ class Job:
self.m_stateFlags = 0
self.m_jobRequirements = JobRequirements()
self.m_requestedRamPerCore = 0
def getId(self):
return self.m_jobId
def setState(self, state):
self.m_stateFlags = state
def setOwner(self, jobOwner):
if self.m_owner:
assert( self.m_owner == jobOwner )
assert self.m_owner == jobOwner
self.m_owner = jobOwner
def getOwner(self):
return self.m_owner
def setStartTime(self, jobStartTime):
if self.m_startTime:
assert( self.m_startTime == jobStartTime )
assert self.m_startTime == jobStartTime
self.m_startTime = jobStartTime
def setSubmitTime(self, jobSubmitTime):
if self.m_submitTime:
assert( self.m_submitTime == jobSubmitTime )
assert self.m_submitTime == jobSubmitTime
self.m_submitTime = jobSubmitTime
def getStartTime(self):
return self.m_startTime
def setScriptName(self, jobScriptName):
if self.m_scriptName:
assert( self.m_scriptName == jobScriptName )
assert self.m_scriptName == jobScriptName
self.m_scriptName = jobScriptName
def addSlots(self, queueMachineName, numSlots):
assert( self.m_slots.get( queueMachineName ) == None )
if self.m_slots.get( queueMachineName ) == None:
assert self.m_slots.get(queueMachineName) is None
if self.m_slots.get(queueMachineName) is None:
self.m_slots[queueMachineName] = numSlots
else:
# should never happen
self.m_slots[queueMachineName] += numSlots
def getSlots(self):
return self.m_slots
def setNumRequiredSlots(self, numSlots):
self.m_jobRequirements.m_numSlots = numSlots
def isPending(self):
"""
returns true if this job is waiting in the queue for whatever reason
"""
return self.m_stateFlags & JobStateFlags.QUEUED
def getRequestedRamPerCore(self):
"""
requested RAM per core in bytes
"""
return self.m_requestedRamPerCore
def setRequestedRamPerCore(self, requestedRam):
"""
requestedRam : requested RAM per core in bytes

View File

@ -1,5 +1,6 @@
from .Log import *
class JobsState:
"""
represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \*"
@ -19,7 +20,7 @@ class JobsState:
self.m_jobs[jobId] = job
if jobId.isJobArrayElement():
tasks = self.m_jobArrayJobs.get(jobId.m_iJobId)
if tasks == None:
if tasks is None:
tasks = {}
self.m_jobArrayJobs[jobId.m_iJobId] = tasks
tasks[jobId] = job
@ -50,14 +51,14 @@ class JobsState:
numUsedSlots = 0
for job in self.m_jobs.values():
numUsedSlotsByThisJob = job.getSlots().get(queueMachine.getName())
if numUsedSlotsByThisJob != None:
if numUsedSlotsByThisJob is not None:
# logInfo('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob))
numUsedSlots += numUsedSlotsByThisJob
else:
None
# logInfo('getNumFreeSlotsOnQueueMachine : job %d uses no slot' % job.getId().asStr())
numFreeSlots = queueMachine.getNumSlots() - numUsedSlots
assert( numFreeSlots >= 0 )
assert numFreeSlots >= 0
return numFreeSlots
def addQueueMachine(self, queueMachine):
@ -70,7 +71,7 @@ class JobsState:
queueMachine = None
for qmName, qm in self.m_queueMachines.items():
if qm.m_machineName == machineName:
assert( queueMachine == None ) # to be sure that no more than one queue machine is on a given machine
assert queueMachine is None # to be sure that no more than one queue machine is on a given machine
queueMachine = qm
return queueMachine

View File

@ -3,27 +3,31 @@ import threading
gLogFilePath = '/tmp/ClusterController.log' # '/var/log/ClusterController.log'
def log(message):
threadName = threading.currentThread().getName()
logMessage = time.asctime(time.localtime()) + ' : ' + threadName + ' : ' + message
print(logMessage)
f = open(gLogFilePath, 'a+')
assert( f )
assert f
try:
f.write(logMessage + '\n')
finally:
f.close()
def logDebug(message):
log('[D]' + message)
return
def logInfo(message):
log('[I]' + message)
def logWarning(message):
log('[W]' + message)
def logError(message):
log('[E]' + message)

View File

@ -6,6 +6,7 @@ class PowerState:
SLEEP = 3
UNPLUGGED = 4
def PowerStateToStr(powerState):
if powerState == PowerState.UNKNOWN:
return 'UNKNOWN'
@ -18,4 +19,4 @@ def PowerStateToStr( powerState ):
if powerState == PowerState.UNPLUGGED:
return 'UNPLUGGED'
else:
assert( False )
assert False

View File

@ -1,10 +1,12 @@
import io
import re
from .JobsState import *
from .QueueMachine import *
from .JobsState import JobsState
from .QueueMachine import QueueMachine, QueueMachineStateFlags
from .Util import *
from .Log import *
from .Job import *
from .Log import logError
from .Job import JobStateFlags, JobId, Job, ParallelEnvironment
import logging
class QstatParser:
def parseJobState(self, strJobStatus):
@ -30,6 +32,7 @@ class QstatParser:
else:
assert False, 'unhandled job state flag :"' + c + '"'
return jobState
def parseQueueMachineState(self, strQueueMachineStatus):
queueMachineState = 0
for i in range(0, len(strQueueMachineStatus)):
@ -49,6 +52,7 @@ class QstatParser:
else:
assert False, 'unhandled queue machine state flag :"' + c + '"'
return queueMachineState
def parseQstatOutput(self, qstatOutput):
"""
parses result of command 'qstat -f -u \* -pri'
@ -76,23 +80,22 @@ class QstatParser:
# the 2nd number is the max element index
# the 3rd number is the step between consecutive element indices
rangeMatch = re.match('^(?P<minElementIndex>[0-9]+)-(?P<maxElementIndex>[0-9]+):(?P<stepBetweenIndices>[0-9]+)$', strRange)
if rangeMatch == None:
if rangeMatch is None:
logError('unexpected format for job array details : "%s" (line="%s"' % (strRange, line))
assert(False)
assert False
iMinElementIndex = int(rangeMatch.group('minElementIndex'))
iMaxElementIndex = int(rangeMatch.group('maxElementIndex'))
iStepBetweenIndices = int(rangeMatch.group('stepBetweenIndices'))
task_ids.extend(range(iMinElementIndex, iMaxElementIndex + 1, iStepBetweenIndices))
return task_ids
# ugly hack to work around the fact that qstat truncates the fqdn of cluster nodes
# graffy@physix-master:~$ qstat -f -u \*
# queuename qtype resv/used/tot. load_avg arch states
# ---------------------------------------------------------------------------------
# main.q@physix88.ipr.univ-renne BIP 0/0/36 14.03 lx-amd64
# TODO: fix this properly by parsing the output of 'qstat -f -u \* -xml' instead of 'qstat -f -u \*'
qstatOutput = re.sub('\.univ[^ ]*', '.univ-rennes1.fr', qstatOutput)
qstatOutput = re.sub(r'\.univ[^ ]*', '.univ-rennes1.fr', qstatOutput)
jobsState = JobsState()
f = io.StringIO(qstatOutput)
@ -113,21 +116,22 @@ class QstatParser:
# ntckts The job's ticket amount in normalized fashion.
# ppri The job's -p priority as specified by the user.
jobRegularExp = re.compile( '^[ ]*(?P<jobId>[^ ]+)[ ]+(?P<JobPriority>[0-9.]+)[ ]+(?P<nurg>[0-9.]+)[ ]+(?P<npprior>[0-9.]+)[ ]+(?P<ntckts>[0-9.]+)[ ]+(?P<ppri>-?[0-9]+)[ ]+(?P<jobScriptName>[^ ]+)[ ]+(?P<jobOwner>[^ ]+)[ ]+(?P<jobStatus>[^ ]+)[ ]+(?P<jobStartOrSubmitTime>[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P<numSlots>[0-9]+)[ ]+(?P<jobArrayDetails>[^\n]*)[\s]*$' )
jobRegularExp = re.compile(r'^[ ]*(?P<jobId>[^ ]+)[ ]+(?P<JobPriority>[0-9.]+)[ ]+(?P<nurg>[0-9.]+)[ ]+(?P<npprior>[0-9.]+)[ ]+(?P<ntckts>[0-9.]+)[ ]+(?P<ppri>-?[0-9]+)[ ]+(?P<jobScriptName>[^ ]+)[ ]+(?P<jobOwner>[^ ]+)[ ]+(?P<jobStatus>[^ ]+)[ ]+(?P<jobStartOrSubmitTime>[0-9][0-9]/[0-9][0-9]/[0-9][0-9][0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9])[ ]+(?P<numSlots>[0-9]+)[ ]+(?P<jobArrayDetails>[^\n]*)[\s]*$')
# example of machine line :
# allintel.q@simpatix34.univ-ren BIP 0/6/8 6.00 darwin-x86
machineRegularExp = re.compile( '^(?P<queueName>[^@]+)@(?P<machineName>[^ ]+)[ ]+(?P<queueTypeString>[^ ]+)[ ]+(?P<numReservedSlots>[^/]+)/(?P<numUsedSlots>[^/]+)/(?P<numTotalSlots>[^ ]+)[ ]+(?P<cpuLoad>[^ ]+)[\s]+(?P<archName>[^ ]+)[\s]+(?P<queueMachineStatus>[^\s]*)' )
machineRegularExp = re.compile(r'^(?P<queueName>[^@]+)@(?P<machineName>[^ ]+)[ ]+(?P<queueTypeString>[^ ]+)[ ]+(?P<numReservedSlots>[^/]+)/(?P<numUsedSlots>[^/]+)/(?P<numTotalSlots>[^ ]+)[ ]+(?P<cpuLoad>[^ ]+)[\s]+(?P<archName>[^ ]+)[\s]+(?P<queueMachineStatus>[^\s]*)')
pendingJobsHeaderRegularExp = re.compile('^ - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS[?]*')
while( len(line) > 0 ):
while len(line) > 0:
# print line
# check if the current line is a line describing a job running on a machine
matchObj = jobRegularExp.match(line)
if matchObj:
# we are dealing with a job line
if not bInPendingJobsSection:
assert( currentQueueMachine )
assert currentQueueMachine
# log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"')
iJobId = int(matchObj.group('jobId'))
logging.debug('iJobId = %d' % iJobId)
jobState = self.parseJobState(matchObj.group('jobStatus'))
strJobArrayDetails = matchObj.group('jobArrayDetails')
bIsJobArray = (len(strJobArrayDetails) != 0)
@ -141,9 +145,11 @@ class QstatParser:
else:
# we are in the running jobs section, and here we expect the strJobArrayDetails to just contain the index of the job array element
iJobArrayElementIndex = int(strJobArrayDetails)
assert(iJobArrayElementIndex != 0) # sge does not allow element indices to be 0
assert iJobArrayElementIndex != 0 # sge does not allow element indices to be 0
task_ids = range(iJobArrayElementIndex, iJobArrayElementIndex + 1)
logging.debug('task_ids = %s' % task_ids)
for task_id in task_ids:
logging.debug('task_id = %s' % task_id)
jobId = None
if bIsJobArray:
jobId = JobId(iJobId, task_id)
@ -151,7 +157,7 @@ class QstatParser:
jobId = JobId(iJobId)
job = jobsState.getJob(jobId)
# logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr()))
if job == None:
if job is None:
# this job hasn't been encountered yet in the output of qstat ...
# we could either be in the pending jobs section or in the running jobs section
job = Job(jobId)
@ -168,7 +174,7 @@ class QstatParser:
if bInPendingJobsSection:
job.setNumRequiredSlots(int(matchObj.group('numSlots')))
else:
assert( not bInPendingJobsSection ) # if we are in the pending jobs section, the job should be new
assert not bInPendingJobsSection # if we are in the pending jobs section, the job should be new
if not bInPendingJobsSection:
job.addSlots(currentQueueMachine.getName(), int(matchObj.group('numSlots')))
else:
@ -209,11 +215,12 @@ class QstatParser:
if not matchObj:
# unexpected line
print('line = "' + line + '"')
assert( False )
assert False
None
line = f.readline()
f.close()
return jobsState
def parseJobDetails(self, qstatOutput, job):
"""
adds to job the details parsed from the output of the "qstat -j <jobid>" command
@ -221,7 +228,7 @@ class QstatParser:
f = io.StringIO(qstatOutput)
line = f.readline()
fieldRegularExp = re.compile('^(?P<fieldName>[^:]+):[ ]+(?P<fieldValue>[?]*)$')
while( len(line) > 0 ):
while len(line) > 0:
# print line
# check if the current line is a line describing a job running on a machine
matchObj = fieldRegularExp.match(line)
@ -229,10 +236,10 @@ class QstatParser:
fieldName = matchObj.group('fieldName')
strFieldValue = matchObj.group('fieldValue')
if fieldName == 'job_number':
assert( job.getId().asStr() == strFieldValue )
assert job.getId().asStr() == strFieldValue
elif fieldName == 'hard_queue_list':
allowedQueues = strFieldValue.split(',')
assert(len(allowedQueues) > 0)
assert len(allowedQueues) > 0
job.m_jobRequirements.m_queues = allowedQueues
elif fieldName == 'parallel environment':
# the value could be 'ompi range: 32'
@ -240,10 +247,9 @@ class QstatParser:
if matchObj:
job.m_jobRequirements.m_parallelEnvironment = ParallelEnvironment.MPI
else:
assert( False )
assert False
else:
# ignore he other fields
None
line = f.readline()
f.close()

View File

@ -7,6 +7,7 @@ class QueueMachineStateFlags: #
OBSOLETE = 16 # the queue no longer exists but it is still visible because it still contains running jobs
SUSPENDED = 32 # the queue machine is suspended
class QueueMachine:
"""
a QueueMachine instance represents a given SGE queue on a given machine (eg allintel.q@simpatix10)
@ -19,6 +20,7 @@ class QueueMachine:
self.m_fCpuLoad = None
self.m_stateFlags = 0
self.m_strDisableMessage = ''
def getName(self):
"""
returns the name of the machine queue (such as allintel.q@simpatix10)
@ -27,39 +29,53 @@ class QueueMachine:
def getQueueName(self):
return self.m_queueName
def getMachineName(self):
return self.m_machineName
def setNumSlots(self, numSlots):
self.m_numSlots = numSlots
def setNumUsedSlots(self, numSlots):
self.m_numUsedSlots = numSlots
def getNumSlots(self):
assert( self.m_numSlots != None )
assert self.m_numSlots is not None
return self.m_numSlots
def getNumUsedSlots(self):
assert( self.m_numUsedSlots != None )
assert self.m_numUsedSlots is not None
return self.m_numUsedSlots
def setCpuLoad(self, fCpuLoad):
self.m_fCpuLoad = fCpuLoad
def cpuLoadIsAvailable(self):
return self.m_fCpuLoad != None
return self.m_fCpuLoad is not None
def getCpuLoad(self):
assert( self.m_fCpuLoad != None )
assert self.m_fCpuLoad is not None
return self.m_fCpuLoad
def setState(self, state):
self.m_stateFlags = state
def isDisabled(self):
return self.m_stateFlags & QueueMachineStateFlags.DISABLED
def isInErrorState(self):
return self.m_stateFlags & QueueMachineStateFlags.ERROR
def isResponding(self):
return not (self.m_stateFlags & QueueMachineStateFlags.UNKNOWN)
def isInAlarmState(self):
return self.m_stateFlags & QueueMachineStateFlags.ALARM
def isSuspended(self):
return self.m_stateFlags & QueueMachineStateFlags.SUSPENDED
"""
def getStateAsString(self):
assert( self.m_strState != None )
assert(self.m_strState is not None)
return self.m_strState
"""

View File

@ -1,14 +1,16 @@
from PowerState import *
from Log import *
from PowerState import PowerState
from Log import logInfo
import time
import copy
class Slot:
def __init__(self):
self.m_queueMachine = None
self.m_numSlots = None
self.m_job = None # job for which this slot is allocated
class SlotAllocator:
"""
a class that defines a strategy for allocating free slots for the given pending jobs
@ -17,7 +19,8 @@ class SlotAllocator:
"""
returns the list of machines that need to wake up to make pending jobs running
"""
assert( False ) # this method is abstract
assert False # this method is abstract
class SimpleSlotAllocator(SlotAllocator):
def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState):
@ -42,7 +45,7 @@ class SimpleSlotAllocator( SlotAllocator ):
remainingNumSlotsToAllocate -= numSlotsAllocatedOnThisMachine
numFreeSlots[queueMachine] -= numSlotsAllocatedOnThisMachine
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
assert( remainingNumSlotsToAllocate >= 0 )
assert remainingNumSlotsToAllocate >= 0
if remainingNumSlotsToAllocate == 0:
break
if remainingNumSlotsToAllocate > 0:
@ -58,13 +61,14 @@ class SimpleSlotAllocator( SlotAllocator ):
numFreeSlots[queueMachine] -= numSlotsAllocatedOnThisMachine
machinesThatNeedWakeUp[machine.getName()] = machine
logInfo('SimpleSlotAllocator::getMachinesThatNeedWakeUp : still %d slots to find' % remainingNumSlotsToAllocate)
assert( remainingNumSlotsToAllocate >= 0 )
assert remainingNumSlotsToAllocate >= 0
if remainingNumSlotsToAllocate == 0:
break
if remainingNumSlotsToAllocate != 0:
return {} # not enough slots available
return machinesThatNeedWakeUp
class DecoupledSlotAllocator(SlotAllocator):
"""
a slot allocator that doesn't know much about sge, and does not attempts to guess what sge'sceduler would do
@ -74,6 +78,7 @@ class DecoupledSlotAllocator( SlotAllocator ):
self.m_delayBetweenPeriodicChecks = -1 # in seconds. Disable periodic checks by setting this to -1
self.m_lastCheckTime = time.time()
self.m_lastClusterState = None
def jobsStateHasChanged(self, newClusterState):
"""
returns true if there is a change in the cluster state that can cause a pending job
@ -115,6 +120,7 @@ class DecoupledSlotAllocator( SlotAllocator ):
# at least one old job has finished, freeing some slots
bJobsHaveChanged = True
return bJobsHaveChanged
def getMachinesThatNeedWakeUp(self, pendingJobs, clusterState):
machinesThatNeedWakeUp = {}
bJobsStateHasChanged = self.jobsStateHasChanged(clusterState)

View File

@ -1,5 +1,8 @@
import Util
from QstatParser import *
import time
from Util import executeProgram
from QstatParser import QstatParser
from Log import logDebug, logWarning
class SunGridEngine:
@ -21,7 +24,6 @@ class SunGridEngine:
jobsState = QstatParser().parseQstatOutput(qstatOutput)
jobsState.setTime(time.time())
# read the requirements for pending jobs (which parallel environment, which queue, which architecture) from sge
if False: # no need for job details at the moment and since it's very slow, it's been disabled
for unused_jobId, job in jobsState.getPendingJobs().items():
@ -50,9 +52,7 @@ class SunGridEngine:
def queueIsEmpty(self, strMachineName):
(returnCode, qstatOutput, unused_stderr) = executeProgram(['qstat', '-f', '-u', '*'])
assert( returnCode == 0 )
assert returnCode == 0
jobsState = QstatParser().parseQstatOutput(qstatOutput)
jobs = jobsState.getJobsOnMachine(strMachineName)
return (len(jobs) == 0)

View File

@ -1,12 +1,12 @@
#!/usr/bin/env python
import sys
sys.path.insert(0, '..')
from Log import *
from Log import logInfo
import Util
from PowerState import *
from PowerState import PowerState
from HTMLParser import HTMLParser
def Test0000():
logInfo('Testing bug 00000003 if a series of wake up, goto sleep can shutdown a machine')
strTargetMachineName = 'simpatix12'
@ -14,15 +14,16 @@ def Test0000():
while True:
if ePowerState == PowerState.ON:
bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName)
assert( bSuccess )
assert bSuccess
bSuccess = Util.blockingPutMachineToSleep(strTargetMachineName)
ePowerState = PowerState.SLEEP
elif ePowerState == PowerState.SLEEP:
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
assert( bSuccess )
assert bSuccess
ePowerState = PowerState.ON
else:
assert(False)
assert False
def Test0001():
logInfo('Testing bug 00000003 : could it be caused by a sleep and a power on at the same tim ?')
@ -30,12 +31,13 @@ def Test0001():
ePowerState = Util.getPowerState(strTargetMachineName)
if ePowerState == PowerState.SLEEP:
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
assert( bSuccess )
assert bSuccess
ePowerState = PowerState.ON
assert(ePowerState == PowerState.ON)
assert ePowerState == PowerState.ON
Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName)
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
assert(bSuccess)
assert bSuccess
def Test0002():
logInfo('Testing bug 00000003 : could it be caused by a power on quickly followed by a sleep ?')
@ -43,11 +45,12 @@ def Test0002():
ePowerState = Util.getPowerState(strTargetMachineName)
if ePowerState == PowerState.ON:
bSuccess = Util.blockingWakeUpMachine(strTargetMachineName)
assert( bSuccess )
assert bSuccess
ePowerState = PowerState.SLEEP
assert(ePowerState == PowerState.SLEEP)
assert ePowerState == PowerState.SLEEP
Util.executeIpmiCommand(strTargetMachineName, 'chassis power on')
Util.executeCommand("ssh %s 'pmset sleepnow'" % strTargetMachineName)
if __name__ == '__main__':
Test0000()

View File

@ -1,13 +1,14 @@
# import .Util
# import ..SimpaDbUtil
from .Log import *
from .PowerState import *
from .Log import logDebug, logInfo, logWarning, logError
from .PowerState import PowerState, PowerStateToStr
import re
import io
import os
import traceback
import sys
def executeProgram(astrArguments):
bBUG_00000008_IS_STILL_ACTIVE = True
if bBUG_00000008_IS_STILL_ACTIVE:
@ -22,12 +23,14 @@ def executeProgram( astrArguments ):
logDebug('executeCommand : stderr of [%s] = %s' % (','.join(astrArguments), stderr))
return (returnCode, stdout, stderr)
def executeCommand(command):
# logDebug('executeCommand : command = ' + command)
(returnCode, stdout, stderr) = Lib.Util.executeCommand(command)
# logDebug('executeCommand : return code of "'+command+'" = '+str(returnCode))
return (returnCode, stdout, stderr)
def executeIpmiCommand(machineName, ipmiCommandArgs):
lomIpAddress = Lib.SimpaDbUtil.getLightOutManagementIpAddress(machineName)
lomPasswordFilepath = '/usr/local/etc/LightOutManagementPassword.txt'
@ -65,6 +68,7 @@ def executeIpmiCommand( machineName, ipmiCommandArgs ):
return (returnCode, stdout, stderr)
def getPowerState(machineName):
ePowerState = PowerState.UNKNOWN
bPowerStateRead = False
@ -72,17 +76,17 @@ def getPowerState( machineName ):
while not bPowerStateRead:
(returnCode, stdout, stderr) = executeIpmiCommand(machineName, ['sensor', 'get', 'ACPI State'])
if returnCode == 0:
matchObj = re.search('\[(?P<AcpiState>S[0-9][^\:]*)\:', stdout)
matchObj = re.search(r'\[(?P<AcpiState>S[0-9][^\:]*)\:', stdout)
bBUG_00000002_IS_STILL_ACTIVE = True
if bBUG_00000002_IS_STILL_ACTIVE:
if matchObj == None:
if matchObj is None:
# the following warning has been commented out because it pollutes the logs and apparently
# it's a 'feature' of all 4 core machines : if the machine is woken up using ipmitool, then
# no power on event is logged ...
# logWarning('degenerate ipmitool output for machine %s (see bug 00000002). Assuming power in on because that''s what I noticed when I had the case.' % machineName)
return PowerState.ON
else:
assert( matchObj )
assert matchObj
strAcpiState = matchObj.group('AcpiState')
if strAcpiState == 'S0/G0':
ePowerState = PowerState.ON
@ -92,7 +96,7 @@ def getPowerState( machineName ):
ePowerState = PowerState.OFF
else:
print(strAcpiState)
assert( False )
assert False
bPowerStateRead = True
else:
# error ... it's either because the machine is unplugged or because the machine is busy (well I'm not sure what happened but I had the case where the command failed for no apparent reason, and therefore I suspect it to be busy). In order to differentiate these 2 cases, we try again and if this caommand fails too many times then we decide it's unplugged (very dodgy I know but I'm disapointed that this command doen't always work, and for now I don't know other ways to differentiate between these cases....)
@ -107,6 +111,7 @@ def getPowerState( machineName ):
bPowerStateRead = True
return ePowerState
def wakeUpMachine(machineName):
"""
this method seems more reliable than wake on lan (sometimes, sending wake on lan packet seems to have no effect)
@ -117,6 +122,7 @@ def wakeUpMachine( machineName ):
bSuccess = (returnCode == 0) # this command can fail if the machine is manually unplugged for example
return bSuccess
def blockingPutMachineToSleep(machineName):
"""
@return true on success, false otherwise
@ -143,7 +149,7 @@ def blockingPutMachineToSleep( machineName ):
else:
if ePowerState != PowerState.ON:
logWarning('unexpectedly, powerState of %s is %s' % (machineName, PowerStateToStr(ePowerState)))
assert(ePowerState == PowerState.ON)
assert ePowerState == PowerState.ON
iAttempt += 1
if iAttempt > iMaxNumAttempts:
if bBUG_239_IS_STILL_ALIVE:
@ -156,6 +162,7 @@ def blockingPutMachineToSleep( machineName ):
logWarning('the attempt to put %s to sleep failed... trying again' % (machineName))
return True
def blockingWakeUpMachine(machineName):
logInfo('waking up machine %s...' % machineName)
numAttempts = 0
@ -169,7 +176,7 @@ def blockingWakeUpMachine(machineName):
iNumWakeUpAttempts += 1
# the previous command can fail if the machine is already in a transition
# in that case we try sevral times bevire giving up
if(bWakeUpMachineSucceeded == False):
if not bWakeUpMachineSucceeded:
if iNumWakeUpAttempts < iMaxNumWakeUpAttempts:
iDelay = 5
logWarning('wake up attempt %d of %s failed... I\'ll try again in %d seconds' % (iNumWakeUpAttempts, machineName, iDelay))
@ -202,6 +209,7 @@ def blockingWakeUpMachine(machineName):
logInfo('Waking up of machine %s completed successfully' % machineName)
return True
def onException(exception):
sys.stdout.flush()
strExceptionType = type(exception)
@ -224,5 +232,3 @@ def onException(exception):
pass
executeCommand('kill -9 %d' % os.getpid()) # stop other threads immediately
exit()

View File

@ -1,5 +1,6 @@
import Sensor
class ClusterNodeSensorsReadings:
"""
@ -15,14 +16,18 @@ class ClusterNodeSensorsReadings:
self.m_sensors = {}
# self.m_powerState = ClusterNodeStatus.POWERSTATE_UNKNOWN
return
def addSensor(self, sensor):
self.m_sensors[sensor.m_name] = sensor
def dump(self):
for key, sensor in self.m_sensors.items():
sensor.dump()
return
# def getPowerState(self):
# return self.m_powerState
def getLowestTemperature(self):
# log('ClusterNodeSensorsReadings::getLowestTemperature : start')
lowestTemperature = 0.0
@ -37,6 +42,6 @@ class ClusterNodeSensorsReadings:
else:
lowestTemperature = sensor.m_temperature
lowestTemperatureIsDefined = True
assert( lowestTemperatureIsDefined )
assert lowestTemperatureIsDefined
# log('ClusterNodeSensorsReadings::getLowestTemperature : end')
return lowestTemperature

View File

@ -3,37 +3,38 @@ import re
from Sensor import FanSensor, TemperatureSensor
from ClusterNodeSensorsReadings import ClusterNodeSensorsReadings
class IpmiTool202Parser:
def parseSensorOutput(self, strOutput, clusterNodeName):
sensorReadings = ClusterNodeSensorsReadings(clusterNodeName)
f = io.StringIO(strOutput)
line = f.readline()
while( len(line) > 0 ):
while len(line) > 0:
# print line,
matchObj = re.match( '^Sensor ID[ ]*\: \'(?P<sensorName>[a-zA-Z 0-9]+)\'', line )
matchObj = re.match(r'^Sensor ID[ ]*\: \'(?P<sensorName>[a-zA-Z 0-9]+)\'', line)
if matchObj:
sensorName = matchObj.group('sensorName')
# print sensorName
# read the entity id
line = f.readline()
matchObj = re.match( '^ Entity ID[ ]*\: (?P<entityId>[0-9\.]+)', line )
assert(matchObj)
matchObj = re.match(r'^ Entity ID[ ]*\: (?P<entityId>[0-9\.]+)', line)
assert matchObj
entityId = matchObj.group('entityId')
# print entityId
# read the sensor type
line = f.readline()
matchObj = re.match( '^ Sensor Type[\(\)a-zA-Z ]*\: (?P<sensorType>[a-zA-Z \(\)]+)', line )
assert(matchObj)
matchObj = re.match(r'^ Sensor Type[\(\)a-zA-Z ]*\: (?P<sensorType>[a-zA-Z \(\)]+)', line)
assert matchObj
sensorType = matchObj.group('sensorType')
# print sensorType
if sensorType == 'Fan':
rpms = self.parseFanSensorOutput(f)
if temperature != None:
if temperature is not None:
sensor = FanSensor(sensorName)
sensor.m_rpms = rpms
elif sensorType == 'Temperature':
temperature = self.parseTemperatureSensorOutput(f)
if temperature != None:
if temperature is not None:
sensor = TemperatureSensor(sensorName)
sensor.m_temperature = temperature
else:
@ -46,21 +47,22 @@ class IpmiTool202Parser:
# assert(False)
line = f.readline()
f.close()
def parseFanSensorOutput(self, file):
"""
reads the fan specific ipdmitool output
"""
line = file.readline()
# print line
matchObj = re.match( '^ Sensor Reading[ ]*\: (?P<numRpms>[0-9]+) \(\+/\- (?P<rpmsPrecision>[0-9]+)\) RPM', line )
if(matchObj):
matchObj = re.match(r'^ Sensor Reading[ ]*\: (?P<numRpms>[0-9]+) \(\+/\- (?P<rpmsPrecision>[0-9]+)\) RPM', line)
if matchObj:
numRpms = matchObj.group('numRpms')
# print numRpms
rpms = float(numRpms)
return rpms
else:
matchObj = re.match( '^ Sensor Reading[ ]*\: Not Present', line )
assert(matchObj)
matchObj = re.match(r'^ Sensor Reading[ ]*\: Not Present', line)
assert matchObj
return None
def parseTemperatureSensorOutput(self, file):
@ -70,12 +72,12 @@ class IpmiTool202Parser:
# Sensor Reading : 36 (+/- 0) degrees C
line = file.readline()
# print line
matchObj = re.match( '^ Sensor Reading[ ]*\: (?P<temperature>[0-9]+) \(\+/\- (?P<precision>[0-9]+)\) degrees C', line )
if(matchObj):
matchObj = re.match(r'^ Sensor Reading[ ]*\: (?P<temperature>[0-9]+) \(\+/\- (?P<precision>[0-9]+)\) degrees C', line)
if matchObj:
temperature = matchObj.group('temperature')
temperature = float(temperature)
return temperature
else:
matchObj = re.match( '^ Sensor Reading[ ]*\: Not Present', line )
assert(matchObj)
matchObj = re.match(r'^ Sensor Reading[ ]*\: Not Present', line)
assert matchObj
return None

View File

@ -3,14 +3,15 @@ import re
from Sensor import FanSensor, TemperatureSensor
from ClusterNodeSensorsReadings import ClusterNodeSensorsReadings
class IpmiTool218Parser:
def parseSensorOutput(self, strOutput, clusterNodeName):
sensorReadings = ClusterNodeSensorsReadings(clusterNodeName)
f = io.StringIO(strOutput)
line = f.readline()
while( len(line) > 0 ):
while len(line) > 0:
# print line,
matchObj = re.match( '^(?P<sensorName>[a-zA-Z 0-9]+[a-zA-Z 0-9]*[a-zA-Z0-9])[ ]*\| (?P<sensorValue>[\.0-9]+)[ ]*\| (?P<sensorUnit>[a-zA-Z0-9][a-zA-Z 0-9]*[a-zA-Z0-9])[?]*', line )
matchObj = re.match(r'^(?P<sensorName>[a-zA-Z 0-9]+[a-zA-Z 0-9]*[a-zA-Z0-9])[ ]*\| (?P<sensorValue>[\.0-9]+)[ ]*\| (?P<sensorUnit>[a-zA-Z0-9][a-zA-Z 0-9]*[a-zA-Z0-9])[?]*', line)
if matchObj:
# log('readClusterNodeSensorsIpmiTool2_1_8 : sensorName = '+matchObj.group('sensorName'))
# log('readClusterNodeSensorsIpmiTool2_1_8 : sensorValue = '+matchObj.group('sensorValue'))
@ -36,4 +37,3 @@ class IpmiTool218Parser:
line = f.readline()
f.close()
return sensorReadings

View File

@ -6,9 +6,9 @@ if sys.version_info < (3, 0):
else:
from io import StringIO
import re
from .wol import *
from .wol import wake_on_lan
import os
from .Util import *
from .Util import executeProgram, executeCommand, log
import abc
import sqlite3
from .mysql2sqlite import mysql_to_sqlite
@ -33,7 +33,7 @@ def isMachineResponding(machineName):
# don't stop the program until we understand bug00000004
else:
log('isMachineResponding : Unexpected return code : returnCode=%d, stdout="%s", stderr="%s" , machineName = %s' % (returnCode, stdout, stderr, machineName))
assert(False)
assert False
return False
@ -63,7 +63,7 @@ class RemoteMysqlDb(ISqlDatabaseBackend):
def _connect(self):
self._conn = MySQLdb.connect(self._db_server_fqdn, self._db_user, '', self._db_name)
assert(self._conn)
assert self._conn
def query(self, sql_query):
"""
@ -163,13 +163,13 @@ class SqlDatabaseReader(object):
def machineNameToMacAddress(machineName):
conn = MySQLdb.connect('simpatix10', 'simpadb_reader', '', 'simpadb')
assert(conn)
assert conn
sqlQuery = """SELECT mac_address FROM ethernet_cards WHERE machine_name='""" + machineName + """' AND type='normal'"""
# print sqlQuery
conn.query(sqlQuery)
r = conn.store_result()
row = r.fetch_row(0)
assert( len(row) == 1)
assert len(row) == 1
# print 'row =', row
macAddress = row[0][0]
# print macAddress
@ -182,13 +182,13 @@ def getLightOutManagementIpAddress(machineName):
the light out management ip of servers allows to talk to the server even when it's asleep
"""
conn = MySQLdb.connect('simpatix10', 'simpadb_reader', '', 'simpadb')
assert(conn)
assert conn
sqlQuery = """SELECT ip_address_1,ip_address_2,ip_address_3,ip_address_4 FROM ethernet_cards WHERE machine_name='""" + machineName + """' AND type='light_out_management'"""
# print sqlQuery
conn.query(sqlQuery)
r = conn.store_result()
row = r.fetch_row(0)
assert(len(row) == 1)
assert len(row) == 1
# print 'row =', row
ipAddress = ('%s.%s.%s.%s') % (row[0][0], row[0][1], row[0][2], row[0][3])
# print macAddress
@ -199,7 +199,7 @@ def getLightOutManagementIpAddress(machineName):
def getClusterMachinesNames():
clusterMachinesNames = []
conn = MySQLdb.connect('simpatix10', 'simpadb_reader', '', 'simpadb')
assert(conn)
assert conn
sqlQuery = """SELECT name FROM machines WHERE affectation='cluster'"""
# print sqlQuery
conn.query(sqlQuery)
@ -231,7 +231,7 @@ def putToSleep(machineName):
print 'stderr :'
print stderr
"""
assert(returnCode == 0)
assert returnCode == 0
# check if the command succeeded by looking at the output (that's the only way I found)
f = StringIO.StringIO(stdout)
line = f.readline()

View File

@ -5,17 +5,23 @@
import re
# import Lib.Util
class SgeConfig:
def __init__(self):
self.m_attrs = {}
def hasAttr(self, attr_name):
return attr_name in self.m_attrs.keys()
def getAttr(self, strAttrName):
return self.m_attrs[strAttrName]
def setAttr(self, strAttrName, strAttrValue):
assert isinstance(strAttrName, str)
assert isinstance(strAttrValue, str)
self.m_attrs[strAttrName] = strAttrValue
def loadFromSgeFormat1String(self, strSgeConfigString):
"""
loads attrs from a string such as :
@ -47,7 +53,7 @@ class SgeConfig:
for strAttrDef in strSgeConfigString.split("\n"):
# print("strAttrDef=%s" % strAttrDef)
if len(strAttrDef) != 0:
matchObj = re.match( "^(?P<attrName>[^\s]+)[ ]+(?P<attrValue>[^\s].*)$", strAttrDef )
matchObj = re.match(r"^(?P<attrName>[^\s]+)[]+(?P<attrValue>[^\s].*)$", strAttrDef)
assert matchObj is not None
# print('%s = %s\n' % (matchObj.group("attrName"), matchObj.group("attrValue")))
self.m_attrs[matchObj.group("attrName")] = matchObj.group("attrValue")
@ -72,9 +78,10 @@ class SgeConfig:
for strAttrDef in strSgeConfigString.split(","):
# print strAttrDef
if len(strAttrDef) != 0:
matchObj = re.match( "^\s*(?P<attrName>[^=]+)=(?P<attrValue>.*)$", strAttrDef )
matchObj = re.match(r"^\s*(?P<attrName>[^=]+)=(?P<attrValue>.*)$", strAttrDef)
# print matchObj.group("attrName")
self.m_attrs[matchObj.group("attrName")] = matchObj.group("attrValue")
def asFormat1String(self):
strResult = ""
for (k, v) in self.m_attrs.items():
@ -120,17 +127,18 @@ class SgeConfig:
# root@physix-master:~# qconf -Mconf /tmp/global
# only a single value is allowed for configuration attribute "administrator_mail"
cleaned_value = re.sub(',\s*', ',', v)
cleaned_value = re.sub(r',\s*', ',', v)
# prevent space pollution in space separated values, such as in reporting_params (see https://bugzilla.ipr.univ-rennes1.fr/show_bug.cgi?id=2812). If spaces are not compacted, the space separated values will contain more and more spaces and at some point corrupt the value : a line containing just a backslash, such as in the following example:
# reporting_params accounting=true reporting=false \
# flush_time=00:00:15 joblog=false \
# sharelog=00:00:00
# \
cleaned_value = re.sub('\s+', ' ', cleaned_value)
cleaned_value = re.sub(r'\s+', ' ', cleaned_value)
strResult += "%s %s\n" % (k, cleaned_value)
# print("strResult=%s" % strResult)
return strResult
def asFormat2String(self):
strResult = ""
iNumAttrs = len(self.m_attrs)
@ -145,8 +153,7 @@ class SgeConfig:
iAttr += 1
# print strSgeConfigString
return strResult
def dump(self):
for (k, v) in self.m_attrs.items():
print("['%s']='%s'" % (k, v))

View File

@ -15,6 +15,7 @@ else:
from html.parser import HTMLParser
from email.mime.text import MIMEText
def sendTextMail(strFrom, to, strSubject, text):
# from = "SimpaCluster <guillaume.raffy@univ-rennes1.fr>"
mail = MIMEText(text)
@ -85,7 +86,6 @@ def executeCommandOn(target_machine_fqdn, command, user=None):
target = '%s@%s' % (user, target_machine_fqdn)
else:
target = target_machine_fqdn
result = executeProgram(['ssh', target, "%s" % command])
logging.debug("finished executing %s on %s as %s" % (command, target_machine_fqdn, user))
return result
@ -94,6 +94,7 @@ def executeCommandOn(target_machine_fqdn, command, user=None):
def getUpsStatus():
class MyHTMLParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.TokenList = []
@ -118,7 +119,8 @@ def getUpsStatus():
return
h = MyHTMLParser()
h.feed(res)
tokensList = h.GetTokenList() # @UnusedVariable
tokensList = h.GetTokenList() # noqa:F841
if __name__ == '__main__':
from SimpaDbUtil import wakeUp

View File

@ -1,63 +0,0 @@
'''
The goal of this application is to convert a mno database into mno's web site compatible database (drupal)
'''
import sqlite3
import os
import re
import sys
from SimpaDbUtil import SqlFile, SqlDatabaseReader
from _sqlite3 import Row
class OrchestraSqlDb( object ):
def __init__(self, sql_reader):
"""
:param SqlDatabaseReader sql_reader: the inventory database
"""
super(OrchestraSqlDb, self).__init__()
self._sql_reader = sql_reader
def query(self, sql_query):
return self._sql_reader.query(sql_query)
class Concert(object):
pass
class Recording(object):
pass
class OrchestraDb(object):
def __init__(self, mno_drupal_db_sql_file_path):
self.concerts = {}
sql_source = SqlFile(mno_drupal_db_sql_file_path)
sql_reader = SqlDatabaseReader(sql_source)
orchestra_sql_db = OrchestraSqlDb(sql_reader)
self._parse_from_orchestra_drupal_db(orchestra_sql_db)
def _parse_from_orchestra_drupal_db(self, orchestra_sql_db):
"""
:param OrchestraSqlDb orchestra_sql_db:
"""
concert_rows = orchestra_sql_db.query("SELECT nid,title FROM node WHERE type is 'concert'")
for concert_row in concert_rows:
(nid, title)=concert_row
print(title)
nid = int(nid)
track_id_rows = orchestra_sql_db.query("SELECT field_tracks_target_id FROM field_revision_field_tracks WHERE entity_id=%d" % nid )
for track_id_row in track_id_rows:
(field_tracks_target_id, ) = track_id_row
#print(field_tracks_target_id)
track_rows = orchestra_sql_db.query("SELECT title FROM node WHERE nid=%d" % field_tracks_target_id)
(recording_title, ) = track_rows[0]
print("\t%s" % recording_title)
mno_db = OrchestraDb('/Users/graffy/data/Perso/MeltingNotes_work.git/website/v2_drupal/melting_drupal.sql')

View File

@ -1,5 +1,6 @@
import re
def mysql_to_sqlite(mysql_sql_code, truncate_hex_strings=False):
"""
converts a mysql-compatible sql code into a sqlite-ompatible sql code
@ -56,7 +57,6 @@ def mysql_to_sqlite( mysql_sql_code, truncate_hex_strings = False ):
# ADD KEY `blocked_ip` (`ip`);
content = re.sub(r'alter table [^;]*;', '', content, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
# COMMIT;
# sqlite3.OperationalError: cannot commit - no transaction is active
content = re.sub(r'commit\s*;', '', content, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
@ -67,13 +67,12 @@ def mysql_to_sqlite( mysql_sql_code, truncate_hex_strings = False ):
# INSERTVALS_RE = re.compile(r'^(INSERT INTO.*?VALUES)\s*((\[^\)](\)));$', re.IGNORECASE | re.MULTILINE | re.DOTALL)
INSERTVALS_SPLIT_RE = re.compile(r'\)\s*,\s*\(', re.IGNORECASE | re.MULTILINE | re.DOTALL)
def insertvals_replacer(match):
insert, values = match.groups()
# print("insert=%s"%insert)
# print("values=%s"%values)
values = re.sub('^\s*\(' ,'', values)
values = re.sub('\)\s*$' ,'', values)
values = re.sub(r'^\s*\(', '', values)
values = re.sub(r'\)\s*$', '', values)
replacement = ''
for vals in INSERTVALS_SPLIT_RE.split(values):
# print("vals=%s"%vals)

View File

@ -4,6 +4,7 @@
import socket
import struct
def wake_on_lan(macaddress):
""" Switches on remote computers using WOL. """
@ -32,7 +33,6 @@ def wake_on_lan(macaddress):
if __name__ == '__main__':
# Use macaddresses with any seperators.
wake_on_lan('00:1E:52:F3:61:60') # simpatix28
# wake_on_lan('00:24:36:F2:D0:FA') # simpatix33

View File

@ -1,7 +1,8 @@
from setuptools import setup
setup(name='cocluto',
version=1.00,
setup(
name='cocluto',
version=1.01,
description='compute cluster utility tools',
url='https://git.ipr.univ-rennes1.fr/graffy/cocluto',
author='Guillaume Raffy',