From 0f0d5f800e5f1978428326ea67117500629b90f7 Mon Sep 17 00:00:00 2001 From: Guillaume Raffy Date: Wed, 1 Apr 2026 19:08:06 +0200 Subject: [PATCH] v 1.0.12 - fixed bug in request_queue_activation, which caused it to always enable the queue, even if there are other disables - added a synchronization mechanism to quman which patches the database to ensure the database is coherent with the current activation of the queues work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093] --- cocluto/quman.py | 104 ++++++++++++++++++++++++++++++++++++++++++--- cocluto/version.py | 2 +- test/test_quman.py | 26 ++++++++++-- 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/cocluto/quman.py b/cocluto/quman.py index 4a0f118..08d5f81 100755 --- a/cocluto/quman.py +++ b/cocluto/quman.py @@ -1,17 +1,33 @@ #!/usr/bin/env python3 from abc import ABC, abstractmethod -from typing import Dict +from typing import Dict, Any import subprocess import argparse from datetime import datetime from pathlib import Path from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb +from cocluto.ClusterController.QstatParser import QstatParser +from cocluto.ClusterController.JobsState import JobsState LogId = int # identifies a log entry in the database RequesterId = str # identifies the queue enable/disable requester eg auto.croconaus, manual.graffy, etc. QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr +class QueuesStatus(): + is_enabled: Dict[QueueMachineId, bool] + + def __init__(self): + self.is_enabled = {} + + def add_queue(self, queue_name: QueueMachineId, is_enabled: bool): + self.is_enabled[queue_name] = is_enabled + + def print(self): + for queue_name, is_enabled in self.is_enabled.items(): + print(f"{queue_name}: {'enabled' if is_enabled else 'disabled'}") + + class IGridEngine(ABC): @abstractmethod @@ -22,6 +38,32 @@ class IGridEngine(ABC): def enable_queue(self, queue_name: QueueMachineId): pass + @abstractmethod + def get_status(self) -> QueuesStatus: + pass + + +class MockGridEngine(IGridEngine): + queues_status: QueuesStatus + + def __init__(self, queues_status: QueuesStatus): + self.queues_status = queues_status + + def disable_queue(self, queue_name: QueueMachineId): + print(f"Mock disable queue {queue_name}") + assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status" + assert self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already disabled" + self.queues_status.is_enabled[queue_name] = False + + def enable_queue(self, queue_name: QueueMachineId): + print(f"Mock enable queue {queue_name}") + assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status" + assert not self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already enabled" + self.queues_status.is_enabled[queue_name] = True + + def get_status(self) -> QueuesStatus: + return self.queues_status + class Sge(IGridEngine): dry_run: bool @@ -43,6 +85,18 @@ class Sge(IGridEngine): def enable_queue(self, queue_name: QueueMachineId): self.run_qmod(["-e", queue_name]) + def get_status(self) -> QueuesStatus: + process = subprocess.run(['qstat', '-f', '-u', '*'], check=True, capture_output=True) + # Parse the output to extract queue statuses + # This is a simplified example - you would need to parse the actual qstat output + queues_status = QueuesStatus() + jobs_state: JobsState = QstatParser.parseQstatOutput(process.stdout.decode()) + queue_machines = jobs_state.getQueueMachines() + for queue_machine in queue_machines.itervalues(): + queues_status.add_queue(queue_machine.get_name(), queue_machine.is_enabled()) + + return queues_status + def init_db(db_backend: ISqlDatabaseBackend): if not db_backend.table_exists('log'): @@ -115,20 +169,21 @@ class QueueManager: assert row[1] == queue_name, "All results should be for the same queue" return {row[0]: DisableReason(log_id=row[0], queue_name=row[1], reason=row[2], requester_id=row[3], timestamp=datetime.fromisoformat(row[4])) for row in results} - def request_queue_deactivation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str): + def request_queue_deactivation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_disable: bool = True): disable_reasons = self.get_disable_reasons(queue_name) for dr in disable_reasons.values(): assert dr.requester_id != requester_id, f"Requester {requester_id} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first." - if len(disable_reasons) == 0: - # queue is currently active, we can disable it - self.grid_engine.disable_queue(queue_name) + if perform_disable: + if len(disable_reasons) == 0: + # queue is currently active, we can disable it + self.grid_engine.disable_queue(queue_name) disable_log_id = self.log_modification(queue_name, "disable", requester_id, reason) self.db_backend.query(f"INSERT INTO state (disable_reason_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');") - def request_queue_activation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str): + def request_queue_activation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_enable: bool = True): disable_reasons = self.get_disable_reasons(queue_name) dr_to_remove = None # the disable reason to remove for dr in disable_reasons.values(): @@ -138,10 +193,45 @@ class QueueManager: assert dr_to_remove is not None, f"Requester {requester_id} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation." - self.grid_engine.enable_queue(queue_name) + if perform_enable: + if len(disable_reasons) == 1: + # queue is currently disabled and there is only one disable reason, we can enable it + self.grid_engine.enable_queue(queue_name) enable_log_id = self.log_modification(queue_name, "enable", requester_id, reason) # noqa: F841 self.db_backend.query(f"DELETE FROM state WHERE disable_reason_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';") + def synchronize_with_grid_engine(self): + """synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat.""" + qs = self.grid_engine.get_status() + for queue_name, is_enabled in qs.is_enabled.items(): + disable_reasons = self.get_disable_reasons(queue_name) + if not is_enabled and len(disable_reasons) == 0: + # queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with requester_id "unknown" and reason "synchronized with grid engine" + self.request_queue_deactivation(queue_name, "quman-sync", "synchronized with grid engine", perform_disable=False) + assert len(self.get_disable_reasons(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none." + elif is_enabled and len(disable_reasons) > 0: + # queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and requester_id "unknown" with reason "synchronized with grid engine" + for dr in disable_reasons.values(): + self.request_queue_activation(queue_name, dr.requester_id, "synchronized with grid engine", perform_enable=False) + assert len(self.get_disable_reasons(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_reasons(queue_name))} disable reasons." + + def get_state_as_json(self) -> Dict[str, Any]: + """returns the state of the queues as a json string.""" + # get the list of queue names from the state table in the database + sql_query = "SELECT DISTINCT queue_name FROM state;" + results = self.db_backend.query(sql_query) + for row in results: + assert len(row) == 1, "Each row should have only one column (queue_name)" + queue_names = [row[0] for row in results] + + state = {} + for queue_name in queue_names: + disable_reasons = self.get_disable_reasons(queue_name) + state[queue_name] = { + "disable_reasons": [{"reason": dr.reason, "requester_id": dr.requester_id, "timestamp": dr.timestamp.isoformat()} for dr in disable_reasons.values()] + } + return state + def main(): diff --git a/cocluto/version.py b/cocluto/version.py index 7dca1b7..2fe35b4 100644 --- a/cocluto/version.py +++ b/cocluto/version.py @@ -1,4 +1,4 @@ -__version__ = '1.0.11' +__version__ = '1.0.12' class Version(object): diff --git a/test/test_quman.py b/test/test_quman.py index 3d113b8..561fad4 100644 --- a/test/test_quman.py +++ b/test/test_quman.py @@ -1,9 +1,10 @@ from pathlib import Path +import json import unittest import logging # from cocluto import ClusterController from cocluto.SimpaDbUtil import SqliteDb -from cocluto.quman import QueueManager, init_db, Sge +from cocluto.quman import QueueManager, init_db, MockGridEngine, QueuesStatus class QumanTestCase(unittest.TestCase): @@ -20,15 +21,34 @@ class QumanTestCase(unittest.TestCase): db_path.unlink() db_backend = SqliteDb(db_path) init_db(db_backend) - quman = QueueManager(db_backend, Sge(dry_run=True)) # set dry_run to True to not actually run qmod commands + qs = QueuesStatus() + for node_id in range(40, 44): + qs.add_queue(f'main.q@alambix{node_id}', True) + qs.add_queue('gpuonly.q@alambix42', True) + grid_engine = MockGridEngine(qs) + grid_engine.disable_queue('main.q@alambix42') # simulate that the queue is already disabled) + quman = QueueManager(db_backend, grid_engine) + print('queues state:') + grid_engine.queues_status.print() + print('disable requests:') + print(json.dumps(quman.get_state_as_json(), indent=2)) + print('synchronizing with grid engine...') + quman.synchronize_with_grid_engine() + print('queues state:') + grid_engine.queues_status.print() + print('disable requests:') + print(json.dumps(quman.get_state_as_json(), indent=2)) quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'disabled to move the alambix42 to another rack') with self.assertRaises(AssertionError): # attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine) quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'because I want to test quman') quman.request_queue_deactivation('main.q@alambix42', 'croconaus.maco-update', 'disabled to update maco') quman.request_queue_activation('main.q@alambix42', 'sysadmin.graffy', 'alambix42 has been moved to a new rack') - # self.assertIsInstance(job_state, JobsState) db_backend.dump(Path('./quman_test/quman_dump.sql')) + print('queues state:') + grid_engine.queues_status.print() + print('disable requests:') + print(json.dumps(quman.get_state_as_json(), indent=2)) if __name__ == '__main__':