refactored cluster related code (renamed Job as Task and Job2 as Job to avoid confusion)

when Job2 was introduced, it was actually representing a job, but the class Job (which actually represented a Task, and this was misleading) already existed

also:
- added type hinting to ease code understanding
- took this opportunity to fix styling issues
This commit is contained in:
Guillaume Raffy 2024-06-13 13:59:49 +02:00
parent f36b2d9d9c
commit 6bf69f909b
3 changed files with 170 additions and 134 deletions

View File

@ -1,3 +1,6 @@
from typing import Optional, Dict, List
from datetime import datetime
class JobStateFlags: class JobStateFlags:
RUNNING = 1 # the job is running RUNNING = 1 # the job is running
@ -14,133 +17,161 @@ class ParallelEnvironment:
MPI = 1 MPI = 1
MemoryUnit = int # number of bytes
JobState = int # combination of JobStateFlags
JobId = int # the job id in the sense of sge's job id : a simple unique number
TaskId = int # the id of a task within its array Job
QueueId = str # eg 'main.q'
QueueMachineId = str # the identifier of a queue machine, eg 'main.q@physix99.ipr.univ-rennes.fr'
ResourceRequest = str # eg 'mem_available=5G'
class JobRequirements: class JobRequirements:
num_slots: Optional[int]
architecture: Optional[str] # machine architecture
m_parallelEnvironment: Optional[int] # todo: make ParallelEnvironment an Enum
queues: Optional[List[QueueId]] # the list of queues this job is allowed to run on
resources: Optional[List[ResourceRequest]]
def __init__(self): def __init__(self):
self.m_numSlots = None self.num_slots = None
self.m_strArchitecture = None # machine architecture self.architecture = None
self.m_parallelEnvironment = None self.m_parallelEnvironment = None
self.m_queues = None # the list of queues this job is allowed to run on self.queues = None
self.resources = None
class JobId: class TaskUid:
""" """
the identifier of a job. the identifier of a task, in the form <job_id>.<element_id>
We treat each element of a job array as a separate job We treat each element of a job array as a separate task
A single integer is no longer enough to identify a job because all elements in a job array A single integer is no longer enough to identify a job because all elements in a job array
share the same sge job identifier. To uniquely define a job array element, we also use the task id. share the same sge job identifier. To uniquely define a job array element, we also use the task id.
""" """
job_id: JobId
task_id: Optional[TaskId] # the identifier of a task within its job None if this identifier does not refer to a job array element
MAX_NUM_JOBS_IN_ARRAY = 1000000 MAX_NUM_JOBS_IN_ARRAY = 1000000
def __init__(self, iJobId, iJobArrayElementId=None): def __init__(self, job_id: JobId, task_id: Optional[TaskId] = None):
if iJobArrayElementId is not None: if task_id is not None:
assert iJobArrayElementId <= self.MAX_NUM_JOBS_IN_ARRAY assert task_id <= self.MAX_NUM_JOBS_IN_ARRAY
self.m_iJobId = iJobId self.job_id = job_id
self.m_iJobArrayElementId = iJobArrayElementId # None if this identifier does not refer to a job array element self.task_id = task_id
def __hash__(self): def __hash__(self):
""" """
required to use a JobId as a dict hash key required to use a TaskUid as a dict hash key
""" """
hash = self.m_iJobId * self.MAX_NUM_JOBS_IN_ARRAY hash = self.job_id * self.MAX_NUM_JOBS_IN_ARRAY
if self.m_iJobArrayElementId is not None: if self.task_id is not None:
hash += self.m_iJobArrayElementId hash += self.task_id
return hash return hash
def __eq__(self, other): def __eq__(self, other: 'TaskUid'):
""" """
required to use a JobId as a dict hash key required to use a TaskUid as a dict hash key
""" """
if self.m_iJobId != other.m_iJobId: if self.job_id != other.job_id:
return False return False
if self.m_iJobArrayElementId != other.m_iJobArrayElementId: if self.task_id != other.task_id:
return False return False
return True return True
def isJobArrayElement(self): def is_job_array_element(self) -> bool:
return (self.m_iJobArrayElementId is not None) return (self.task_id is not None)
def getMainId(self): def get_job_id(self) -> JobId:
return self.m_iJobId return self.job_id
def asStr(self): def as_str(self):
strResult = '%s' % self.m_iJobId result = '%s' % self.job_id
if self.isJobArrayElement(): if self.is_job_array_element():
strResult += '.%d' % self.m_iJobArrayElementId result += '.%d' % self.task_id
return strResult return result
class Job: class Task:
def __init__(self, jobId): task_uid: TaskUid # the unique identified of this task, eg '12345.789'
self.m_jobId = jobId start_time: Optional[datetime]
self.m_startTime = None submit_time: Optional[datetime]
self.m_submitTime = None owner: Optional[str]
self.m_owner = None script_name: Optional[str]
self.m_scriptName = None slots: Dict[QueueMachineId, int]
self.m_slots = {} state_flags: JobStateFlags
self.m_stateFlags = 0 job_requirements: JobRequirements
self.m_jobRequirements = JobRequirements() requested_ram_per_core: MemoryUnit
self.m_requestedRamPerCore = 0
def getId(self): def __init__(self, task_uid):
return self.m_jobId self.task_uid = task_uid
self.start_time = None
self.submit_time = None
self.owner = None
self.script_name = None
self.slots = {}
self.state_flags = 0
self.job_requirements = JobRequirements()
self.requested_ram_per_core = 0
def setState(self, state): def get_id(self):
self.m_stateFlags = state return self.task_uid
def setOwner(self, jobOwner): def set_state(self, state: JobState):
if self.m_owner: self.state_flags = state
assert self.m_owner == jobOwner
self.m_owner = jobOwner
def getOwner(self): def set_owner(self, job_owner: str):
return self.m_owner if self.owner:
assert self.owner == job_owner
self.owner = job_owner
def setStartTime(self, jobStartTime): def get_owner(self) -> str:
if self.m_startTime: return self.owner
assert self.m_startTime == jobStartTime
self.m_startTime = jobStartTime
def setSubmitTime(self, jobSubmitTime): def set_start_time(self, job_start_time: datetime):
if self.m_submitTime: if self.start_time:
assert self.m_submitTime == jobSubmitTime assert self.start_time == job_start_time
self.m_submitTime = jobSubmitTime self.start_time = job_start_time
def getStartTime(self): def get_submit_time(self, job_submit_time: datetime):
return self.m_startTime if self.submit_time:
assert self.submit_time == job_submit_time
self.submit_time = job_submit_time
def setScriptName(self, jobScriptName): def get_start_time(self) -> datetime:
if self.m_scriptName: return self.start_time
assert self.m_scriptName == jobScriptName
self.m_scriptName = jobScriptName
def addSlots(self, queueMachineName, numSlots): def set_script_name(self, job_script_name: str):
assert self.m_slots.get(queueMachineName) is None if self.script_name:
if self.m_slots.get(queueMachineName) is None: assert self.script_name == job_script_name
self.m_slots[queueMachineName] = numSlots self.script_name = job_script_name
def add_slots(self, queue_machine_id: QueueMachineId, num_slots: int):
assert self.slots.get(queue_machine_id) is None
if self.slots.get(queue_machine_id) is None:
self.slots[queue_machine_id] = num_slots
else: else:
# should never happen # should never happen
self.m_slots[queueMachineName] += numSlots self.slots[queue_machine_id] += num_slots
def getSlots(self): def get_slots(self) -> Dict[QueueMachineId, int]:
return self.m_slots return self.slots
def setNumRequiredSlots(self, numSlots): def set_num_required_slots(self, num_slots: int):
self.m_jobRequirements.m_numSlots = numSlots self.job_requirements.num_slots = num_slots
def isPending(self): def is_pending(self):
""" """
returns true if this job is waiting in the queue for whatever reason returns true if this task is waiting in the queue for whatever reason
""" """
return self.m_stateFlags & JobStateFlags.QUEUED return self.state_flags & JobStateFlags.QUEUED
def getRequestedRamPerCore(self): def get_requested_ram_per_core(self) -> MemoryUnit:
""" """
requested RAM per core in bytes requested RAM per core in bytes
""" """
return self.m_requestedRamPerCore return self.requested_ram_per_core
def setRequestedRamPerCore(self, requestedRam): def set_requested_ram_per_core(self, requested_ram: MemoryUnit):
""" """
requestedRam : requested RAM per core in bytes requestedRam : requested RAM per core in bytes
""" """
self.m_requestedRamPerCore = requestedRam self.requested_ram_per_core = requested_ram

View File

@ -1,35 +1,40 @@
from typing import Dict
from .Log import * from .Log import *
from .Job import Task, TaskUid
class JobsState: class JobsState:
""" """
represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \*" represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \*"
""" """
tasks: Dict[TaskUid, Task]
job_array_tasks: Dict[int, Dict[TaskUid, Task]]
def __init__(self): def __init__(self):
self.m_jobs = {} # list of jobs self.tasks = {} # list of jobs
self.m_jobArrayJobs = {} # a dictionary of jobs for each job array, indexed by job array id self.job_array_tasks = {} # a dictionary of jobs for each job array, indexed by job array id
self.m_queueMachines = {} # list of queue machines such as allintel.q@simpatix10 self.m_queueMachines = {} # list of queue machines such as allintel.q@simpatix10
self.m_stateTime = None # the time at which the state was snapshot self.m_stateTime = None # the time at which the state was snapshot
def deleteAllJobs(self): def deleteAllJobs(self):
self.m_jobs = {} self.tasks = {}
self.m_jobArrayJobs = {} self.job_array_tasks = {}
def addJob(self, job): def addTask(self, task: Task):
jobId = job.getId() task_uid = task.get_id()
self.m_jobs[jobId] = job self.tasks[task_uid] = task
if jobId.isJobArrayElement(): if task_uid.is_job_array_element():
tasks = self.m_jobArrayJobs.get(jobId.m_iJobId) tasks = self.job_array_tasks.get(task_uid.job_id)
if tasks is None: if tasks is None:
tasks = {} tasks = {}
self.m_jobArrayJobs[jobId.m_iJobId] = tasks self.job_array_tasks[task_uid.job_id] = tasks
tasks[jobId] = job tasks[task_uid] = task
def getJob(self, jobId): def get_task(self, task_uid: TaskUid) -> Task:
return self.m_jobs.get(jobId) return self.tasks.get(task_uid)
def getJobArrayJobs(self, iJobArrayId): def get_job_array_tasks(self, job_array_id: int) -> Dict[TaskUid, Task]:
return self.m_jobArrayJobs.get(iJobArrayId) return self.job_array_tasks.get(job_array_id)
def setTime(self, stateTime): def setTime(self, stateTime):
self.m_stateTime = stateTime self.m_stateTime = stateTime
@ -38,19 +43,19 @@ class JobsState:
return self.m_stateTime return self.m_stateTime
def getJobsOnMachine(self, machineName): def getJobsOnMachine(self, machineName):
jobsOnMachine = {} jobs_on_machine = {}
for jobId, job in self.m_jobs.items(): for task_uid, task in self.tasks.items():
for queueMachineName, numSlots in job.getSlots().items(): for queueMachineName, numSlots in task.get_slots().items():
jobMachineName = queueMachineName.split('@')[1] jobMachineName = queueMachineName.split('@')[1]
if jobMachineName == machineName: if jobMachineName == machineName:
jobsOnMachine[jobId] = job jobs_on_machine[task_uid] = task
return jobsOnMachine return jobs_on_machine
def getNumFreeSlotsOnQueueMachine(self, queueMachine): def getNumFreeSlotsOnQueueMachine(self, queueMachine):
# logInfo('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.getName()) # logInfo('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.getName())
numUsedSlots = 0 numUsedSlots = 0
for job in self.m_jobs.values(): for job in self.tasks.values():
numUsedSlotsByThisJob = job.getSlots().get(queueMachine.getName()) numUsedSlotsByThisJob = job.get_slots().get(queueMachine.getName())
if numUsedSlotsByThisJob is not None: if numUsedSlotsByThisJob is not None:
# logInfo('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob)) # logInfo('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob))
numUsedSlots += numUsedSlotsByThisJob numUsedSlots += numUsedSlotsByThisJob
@ -80,7 +85,7 @@ class JobsState:
def getPendingJobs(self): def getPendingJobs(self):
pendingJobs = {} pendingJobs = {}
for jobId, job in self.m_jobs.items(): for jobId, job in self.tasks.items():
if job.isPending(): if job.is_pending():
pendingJobs[job.getId()] = job pendingJobs[job.get_id()] = job
return pendingJobs return pendingJobs

View File

@ -4,35 +4,35 @@ from .JobsState import JobsState
from .QueueMachine import QueueMachine, QueueMachineStateFlags from .QueueMachine import QueueMachine, QueueMachineStateFlags
from .Util import * from .Util import *
from .Log import logError from .Log import logError
from .Job import JobStateFlags, JobId, Job, ParallelEnvironment from .Job import JobStateFlags, TaskUid, Task, ParallelEnvironment, JobState
import logging import logging
import time import time
class QstatParser: class QstatParser:
def parseJobState(self, strJobStatus): def parseJobState(self, job_status_as_str: str) -> JobState:
jobState = 0 job_state = 0
for i in range(0, len(strJobStatus)): for i in range(0, len(job_status_as_str)):
c = strJobStatus[i] c = job_status_as_str[i]
if c == 'r': if c == 'r':
jobState += JobStateFlags.RUNNING job_state += JobStateFlags.RUNNING
elif c == 'w': elif c == 'w':
jobState += JobStateFlags.WAITING job_state += JobStateFlags.WAITING
elif c == 'q': elif c == 'q':
jobState += JobStateFlags.QUEUED job_state += JobStateFlags.QUEUED
elif c == 't': elif c == 't':
jobState += JobStateFlags.TRANSFERING job_state += JobStateFlags.TRANSFERING
elif c == 'd': elif c == 'd':
jobState += JobStateFlags.DELETED job_state += JobStateFlags.DELETED
elif c == 'h': elif c == 'h':
jobState += JobStateFlags.HOLD job_state += JobStateFlags.HOLD
elif c == 's': elif c == 's':
jobState += JobStateFlags.SUSPENDED job_state += JobStateFlags.SUSPENDED
elif c == 'E': elif c == 'E':
jobState += JobStateFlags.ERROR job_state += JobStateFlags.ERROR
else: else:
assert False, 'unhandled job state flag :"' + c + '"' assert False, 'unhandled job state flag :"' + c + '"'
return jobState return job_state
def parseQueueMachineState(self, strQueueMachineStatus): def parseQueueMachineState(self, strQueueMachineStatus):
queueMachineState = 0 queueMachineState = 0
@ -134,8 +134,8 @@ class QstatParser:
if not bInPendingJobsSection: if not bInPendingJobsSection:
assert currentQueueMachine assert currentQueueMachine
# log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"') # log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"')
iJobId = int(matchObj.group('jobId')) job_id = int(matchObj.group('jobId'))
logging.debug('iJobId = %d' % iJobId) logging.debug('iJobId = %d' % job_id)
jobState = self.parseJobState(matchObj.group('jobStatus')) jobState = self.parseJobState(matchObj.group('jobStatus'))
strJobArrayDetails = matchObj.group('jobArrayDetails') strJobArrayDetails = matchObj.group('jobArrayDetails')
bIsJobArray = (len(strJobArrayDetails) != 0) bIsJobArray = (len(strJobArrayDetails) != 0)
@ -154,33 +154,33 @@ class QstatParser:
logging.debug('task_ids = %s' % task_ids) logging.debug('task_ids = %s' % task_ids)
for task_id in task_ids: for task_id in task_ids:
logging.debug('task_id = %s' % task_id) logging.debug('task_id = %s' % task_id)
jobId = None task_uid = None
if bIsJobArray: if bIsJobArray:
jobId = JobId(iJobId, task_id) task_uid = TaskUid(job_id, task_id)
else: else:
jobId = JobId(iJobId) task_uid = TaskUid(job_id)
job = jobsState.getJob(jobId) task = jobsState.get_task(task_uid)
# logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr())) # logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr()))
if job is None: if task is None:
# this job hasn't been encountered yet in the output of qstat ... # this job hasn't been encountered yet in the output of qstat ...
# we could either be in the pending jobs section or in the running jobs section # we could either be in the pending jobs section or in the running jobs section
job = Job(jobId) task = Task(task_uid)
jobsState.addJob(job) jobsState.addTask(task)
job.setState(jobState) task.set_state(jobState)
strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime') strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime')
jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S') jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S')
if bInPendingJobsSection: if bInPendingJobsSection:
job.setSubmitTime(jobStartOrSubmitTime) task.get_submit_time(jobStartOrSubmitTime)
else: else:
job.setStartTime(jobStartOrSubmitTime) task.set_start_time(jobStartOrSubmitTime)
job.setOwner(matchObj.group('jobOwner')) task.set_owner(matchObj.group('jobOwner'))
job.setScriptName(matchObj.group('jobScriptName')) task.set_script_name(matchObj.group('jobScriptName'))
if bInPendingJobsSection: if bInPendingJobsSection:
job.setNumRequiredSlots(int(matchObj.group('numSlots'))) task.set_num_required_slots(int(matchObj.group('numSlots')))
else: else:
assert not bInPendingJobsSection # if we are in the pending jobs section, the job should be new assert not bInPendingJobsSection # if we are in the pending jobs section, the job should be new
if not bInPendingJobsSection: if not bInPendingJobsSection:
job.addSlots(currentQueueMachine.getName(), int(matchObj.group('numSlots'))) task.add_slots(currentQueueMachine.getName(), int(matchObj.group('numSlots')))
else: else:
# the current line does not describe a job # the current line does not describe a job
if not bInPendingJobsSection: if not bInPendingJobsSection: