diff --git a/cocluto/ClusterController/Job.py b/cocluto/ClusterController/Job.py index 3607ada..3b6f17f 100644 --- a/cocluto/ClusterController/Job.py +++ b/cocluto/ClusterController/Job.py @@ -1,3 +1,6 @@ +from typing import Optional, Dict, List +from datetime import datetime + class JobStateFlags: RUNNING = 1 # the job is running @@ -14,133 +17,161 @@ class ParallelEnvironment: MPI = 1 +MemoryUnit = int # number of bytes +JobState = int # combination of JobStateFlags +JobId = int # the job id in the sense of sge's job id : a simple unique number +TaskId = int # the id of a task within its array Job +QueueId = str # eg 'main.q' +QueueMachineId = str # the identifier of a queue machine, eg 'main.q@physix99.ipr.univ-rennes.fr' +ResourceRequest = str # eg 'mem_available=5G' + + class JobRequirements: + num_slots: Optional[int] + architecture: Optional[str] # machine architecture + m_parallelEnvironment: Optional[int] # todo: make ParallelEnvironment an Enum + queues: Optional[List[QueueId]] # the list of queues this job is allowed to run on + resources: Optional[List[ResourceRequest]] + def __init__(self): - self.m_numSlots = None - self.m_strArchitecture = None # machine architecture + self.num_slots = None + self.architecture = None self.m_parallelEnvironment = None - self.m_queues = None # the list of queues this job is allowed to run on + self.queues = None + self.resources = None -class JobId: +class TaskUid: """ - the identifier of a job. - We treat each element of a job array as a separate job + the identifier of a task, in the form . + We treat each element of a job array as a separate task A single integer is no longer enough to identify a job because all elements in a job array share the same sge job identifier. To uniquely define a job array element, we also use the task id. """ + job_id: JobId + task_id: Optional[TaskId] # the identifier of a task within its job None if this identifier does not refer to a job array element MAX_NUM_JOBS_IN_ARRAY = 1000000 - def __init__(self, iJobId, iJobArrayElementId=None): - if iJobArrayElementId is not None: - assert iJobArrayElementId <= self.MAX_NUM_JOBS_IN_ARRAY - self.m_iJobId = iJobId - self.m_iJobArrayElementId = iJobArrayElementId # None if this identifier does not refer to a job array element + def __init__(self, job_id: JobId, task_id: Optional[TaskId] = None): + if task_id is not None: + assert task_id <= self.MAX_NUM_JOBS_IN_ARRAY + self.job_id = job_id + self.task_id = task_id def __hash__(self): """ - required to use a JobId as a dict hash key + required to use a TaskUid as a dict hash key """ - hash = self.m_iJobId * self.MAX_NUM_JOBS_IN_ARRAY - if self.m_iJobArrayElementId is not None: - hash += self.m_iJobArrayElementId + hash = self.job_id * self.MAX_NUM_JOBS_IN_ARRAY + if self.task_id is not None: + hash += self.task_id return hash - def __eq__(self, other): + def __eq__(self, other: 'TaskUid'): """ - required to use a JobId as a dict hash key + required to use a TaskUid as a dict hash key """ - if self.m_iJobId != other.m_iJobId: + if self.job_id != other.job_id: return False - if self.m_iJobArrayElementId != other.m_iJobArrayElementId: + if self.task_id != other.task_id: return False return True - def isJobArrayElement(self): - return (self.m_iJobArrayElementId is not None) + def is_job_array_element(self) -> bool: + return (self.task_id is not None) - def getMainId(self): - return self.m_iJobId + def get_job_id(self) -> JobId: + return self.job_id - def asStr(self): - strResult = '%s' % self.m_iJobId - if self.isJobArrayElement(): - strResult += '.%d' % self.m_iJobArrayElementId - return strResult + def as_str(self): + result = '%s' % self.job_id + if self.is_job_array_element(): + result += '.%d' % self.task_id + return result -class Job: - def __init__(self, jobId): - self.m_jobId = jobId - self.m_startTime = None - self.m_submitTime = None - self.m_owner = None - self.m_scriptName = None - self.m_slots = {} - self.m_stateFlags = 0 - self.m_jobRequirements = JobRequirements() - self.m_requestedRamPerCore = 0 +class Task: + task_uid: TaskUid # the unique identified of this task, eg '12345.789' + start_time: Optional[datetime] + submit_time: Optional[datetime] + owner: Optional[str] + script_name: Optional[str] + slots: Dict[QueueMachineId, int] + state_flags: JobStateFlags + job_requirements: JobRequirements + requested_ram_per_core: MemoryUnit - def getId(self): - return self.m_jobId + def __init__(self, task_uid): + self.task_uid = task_uid + self.start_time = None + self.submit_time = None + self.owner = None + self.script_name = None + self.slots = {} + self.state_flags = 0 + self.job_requirements = JobRequirements() + self.requested_ram_per_core = 0 - def setState(self, state): - self.m_stateFlags = state + def get_id(self): + return self.task_uid - def setOwner(self, jobOwner): - if self.m_owner: - assert self.m_owner == jobOwner - self.m_owner = jobOwner + def set_state(self, state: JobState): + self.state_flags = state - def getOwner(self): - return self.m_owner + def set_owner(self, job_owner: str): + if self.owner: + assert self.owner == job_owner + self.owner = job_owner - def setStartTime(self, jobStartTime): - if self.m_startTime: - assert self.m_startTime == jobStartTime - self.m_startTime = jobStartTime + def get_owner(self) -> str: + return self.owner - def setSubmitTime(self, jobSubmitTime): - if self.m_submitTime: - assert self.m_submitTime == jobSubmitTime - self.m_submitTime = jobSubmitTime + def set_start_time(self, job_start_time: datetime): + if self.start_time: + assert self.start_time == job_start_time + self.start_time = job_start_time - def getStartTime(self): - return self.m_startTime + def get_submit_time(self, job_submit_time: datetime): + if self.submit_time: + assert self.submit_time == job_submit_time + self.submit_time = job_submit_time - def setScriptName(self, jobScriptName): - if self.m_scriptName: - assert self.m_scriptName == jobScriptName - self.m_scriptName = jobScriptName + def get_start_time(self) -> datetime: + return self.start_time - def addSlots(self, queueMachineName, numSlots): - assert self.m_slots.get(queueMachineName) is None - if self.m_slots.get(queueMachineName) is None: - self.m_slots[queueMachineName] = numSlots + def set_script_name(self, job_script_name: str): + if self.script_name: + assert self.script_name == job_script_name + self.script_name = job_script_name + + def add_slots(self, queue_machine_id: QueueMachineId, num_slots: int): + assert self.slots.get(queue_machine_id) is None + if self.slots.get(queue_machine_id) is None: + self.slots[queue_machine_id] = num_slots else: # should never happen - self.m_slots[queueMachineName] += numSlots + self.slots[queue_machine_id] += num_slots - def getSlots(self): - return self.m_slots + def get_slots(self) -> Dict[QueueMachineId, int]: + return self.slots - def setNumRequiredSlots(self, numSlots): - self.m_jobRequirements.m_numSlots = numSlots + def set_num_required_slots(self, num_slots: int): + self.job_requirements.num_slots = num_slots - def isPending(self): + def is_pending(self): """ - returns true if this job is waiting in the queue for whatever reason + returns true if this task is waiting in the queue for whatever reason """ - return self.m_stateFlags & JobStateFlags.QUEUED + return self.state_flags & JobStateFlags.QUEUED - def getRequestedRamPerCore(self): + def get_requested_ram_per_core(self) -> MemoryUnit: """ requested RAM per core in bytes """ - return self.m_requestedRamPerCore + return self.requested_ram_per_core - def setRequestedRamPerCore(self, requestedRam): + def set_requested_ram_per_core(self, requested_ram: MemoryUnit): """ requestedRam : requested RAM per core in bytes """ - self.m_requestedRamPerCore = requestedRam + self.requested_ram_per_core = requested_ram diff --git a/cocluto/ClusterController/JobsState.py b/cocluto/ClusterController/JobsState.py index 80a7df6..4251202 100644 --- a/cocluto/ClusterController/JobsState.py +++ b/cocluto/ClusterController/JobsState.py @@ -1,35 +1,40 @@ +from typing import Dict from .Log import * +from .Job import Task, TaskUid class JobsState: """ represents a snapshot of the state of SGE jobs as seen by the SGE command "qstat -f -u \*" """ + tasks: Dict[TaskUid, Task] + job_array_tasks: Dict[int, Dict[TaskUid, Task]] + def __init__(self): - self.m_jobs = {} # list of jobs - self.m_jobArrayJobs = {} # a dictionary of jobs for each job array, indexed by job array id + self.tasks = {} # list of jobs + self.job_array_tasks = {} # a dictionary of jobs for each job array, indexed by job array id self.m_queueMachines = {} # list of queue machines such as allintel.q@simpatix10 self.m_stateTime = None # the time at which the state was snapshot def deleteAllJobs(self): - self.m_jobs = {} - self.m_jobArrayJobs = {} + self.tasks = {} + self.job_array_tasks = {} - def addJob(self, job): - jobId = job.getId() - self.m_jobs[jobId] = job - if jobId.isJobArrayElement(): - tasks = self.m_jobArrayJobs.get(jobId.m_iJobId) + def addTask(self, task: Task): + task_uid = task.get_id() + self.tasks[task_uid] = task + if task_uid.is_job_array_element(): + tasks = self.job_array_tasks.get(task_uid.job_id) if tasks is None: tasks = {} - self.m_jobArrayJobs[jobId.m_iJobId] = tasks - tasks[jobId] = job + self.job_array_tasks[task_uid.job_id] = tasks + tasks[task_uid] = task - def getJob(self, jobId): - return self.m_jobs.get(jobId) + def get_task(self, task_uid: TaskUid) -> Task: + return self.tasks.get(task_uid) - def getJobArrayJobs(self, iJobArrayId): - return self.m_jobArrayJobs.get(iJobArrayId) + def get_job_array_tasks(self, job_array_id: int) -> Dict[TaskUid, Task]: + return self.job_array_tasks.get(job_array_id) def setTime(self, stateTime): self.m_stateTime = stateTime @@ -38,19 +43,19 @@ class JobsState: return self.m_stateTime def getJobsOnMachine(self, machineName): - jobsOnMachine = {} - for jobId, job in self.m_jobs.items(): - for queueMachineName, numSlots in job.getSlots().items(): + jobs_on_machine = {} + for task_uid, task in self.tasks.items(): + for queueMachineName, numSlots in task.get_slots().items(): jobMachineName = queueMachineName.split('@')[1] if jobMachineName == machineName: - jobsOnMachine[jobId] = job - return jobsOnMachine + jobs_on_machine[task_uid] = task + return jobs_on_machine def getNumFreeSlotsOnQueueMachine(self, queueMachine): # logInfo('getNumFreeSlotsOnQueueMachine : looking for free slots on queuemachine %s' % queueMachine.getName()) numUsedSlots = 0 - for job in self.m_jobs.values(): - numUsedSlotsByThisJob = job.getSlots().get(queueMachine.getName()) + for job in self.tasks.values(): + numUsedSlotsByThisJob = job.get_slots().get(queueMachine.getName()) if numUsedSlotsByThisJob is not None: # logInfo('getNumFreeSlotsOnQueueMachine : job %d uses %d slots' % (job.getId().asStr(), numUsedSlotsByThisJob)) numUsedSlots += numUsedSlotsByThisJob @@ -80,7 +85,7 @@ class JobsState: def getPendingJobs(self): pendingJobs = {} - for jobId, job in self.m_jobs.items(): - if job.isPending(): - pendingJobs[job.getId()] = job + for jobId, job in self.tasks.items(): + if job.is_pending(): + pendingJobs[job.get_id()] = job return pendingJobs diff --git a/cocluto/ClusterController/QstatParser.py b/cocluto/ClusterController/QstatParser.py index 02c6b23..f03cc5f 100644 --- a/cocluto/ClusterController/QstatParser.py +++ b/cocluto/ClusterController/QstatParser.py @@ -4,35 +4,35 @@ from .JobsState import JobsState from .QueueMachine import QueueMachine, QueueMachineStateFlags from .Util import * from .Log import logError -from .Job import JobStateFlags, JobId, Job, ParallelEnvironment +from .Job import JobStateFlags, TaskUid, Task, ParallelEnvironment, JobState import logging import time class QstatParser: - def parseJobState(self, strJobStatus): - jobState = 0 - for i in range(0, len(strJobStatus)): - c = strJobStatus[i] + def parseJobState(self, job_status_as_str: str) -> JobState: + job_state = 0 + for i in range(0, len(job_status_as_str)): + c = job_status_as_str[i] if c == 'r': - jobState += JobStateFlags.RUNNING + job_state += JobStateFlags.RUNNING elif c == 'w': - jobState += JobStateFlags.WAITING + job_state += JobStateFlags.WAITING elif c == 'q': - jobState += JobStateFlags.QUEUED + job_state += JobStateFlags.QUEUED elif c == 't': - jobState += JobStateFlags.TRANSFERING + job_state += JobStateFlags.TRANSFERING elif c == 'd': - jobState += JobStateFlags.DELETED + job_state += JobStateFlags.DELETED elif c == 'h': - jobState += JobStateFlags.HOLD + job_state += JobStateFlags.HOLD elif c == 's': - jobState += JobStateFlags.SUSPENDED + job_state += JobStateFlags.SUSPENDED elif c == 'E': - jobState += JobStateFlags.ERROR + job_state += JobStateFlags.ERROR else: assert False, 'unhandled job state flag :"' + c + '"' - return jobState + return job_state def parseQueueMachineState(self, strQueueMachineStatus): queueMachineState = 0 @@ -134,8 +134,8 @@ class QstatParser: if not bInPendingJobsSection: assert currentQueueMachine # log('QstatParser::parseQstatOutput : jobId = "'+matchObj.group('jobId')+'"') - iJobId = int(matchObj.group('jobId')) - logging.debug('iJobId = %d' % iJobId) + job_id = int(matchObj.group('jobId')) + logging.debug('iJobId = %d' % job_id) jobState = self.parseJobState(matchObj.group('jobStatus')) strJobArrayDetails = matchObj.group('jobArrayDetails') bIsJobArray = (len(strJobArrayDetails) != 0) @@ -154,33 +154,33 @@ class QstatParser: logging.debug('task_ids = %s' % task_ids) for task_id in task_ids: logging.debug('task_id = %s' % task_id) - jobId = None + task_uid = None if bIsJobArray: - jobId = JobId(iJobId, task_id) + task_uid = TaskUid(job_id, task_id) else: - jobId = JobId(iJobId) - job = jobsState.getJob(jobId) + task_uid = TaskUid(job_id) + task = jobsState.get_task(task_uid) # logDebug('iElementIndex = %d job id = %s' % (iElementIndex, jobId.asStr())) - if job is None: + if task is None: # this job hasn't been encountered yet in the output of qstat ... # we could either be in the pending jobs section or in the running jobs section - job = Job(jobId) - jobsState.addJob(job) - job.setState(jobState) + task = Task(task_uid) + jobsState.addTask(task) + task.set_state(jobState) strJobStartOrSubmitTime = matchObj.group('jobStartOrSubmitTime') jobStartOrSubmitTime = time.strptime(strJobStartOrSubmitTime, '%m/%d/%Y %H:%M:%S') if bInPendingJobsSection: - job.setSubmitTime(jobStartOrSubmitTime) + task.get_submit_time(jobStartOrSubmitTime) else: - job.setStartTime(jobStartOrSubmitTime) - job.setOwner(matchObj.group('jobOwner')) - job.setScriptName(matchObj.group('jobScriptName')) + task.set_start_time(jobStartOrSubmitTime) + task.set_owner(matchObj.group('jobOwner')) + task.set_script_name(matchObj.group('jobScriptName')) if bInPendingJobsSection: - job.setNumRequiredSlots(int(matchObj.group('numSlots'))) + task.set_num_required_slots(int(matchObj.group('numSlots'))) else: assert not bInPendingJobsSection # if we are in the pending jobs section, the job should be new if not bInPendingJobsSection: - job.addSlots(currentQueueMachine.getName(), int(matchObj.group('numSlots'))) + task.add_slots(currentQueueMachine.getName(), int(matchObj.group('numSlots'))) else: # the current line does not describe a job if not bInPendingJobsSection: