From 04adbbc684afeeeeb23716b14dfc14127054a434 Mon Sep 17 00:00:00 2001 From: Guillaume Raffy Date: Fri, 3 Apr 2026 17:01:56 +0200 Subject: [PATCH] cocluto v1.0.16 - overhauld quman arguments parsing - quman's argument parsing now uses subparsers to allow for more actions than enable and disable - added the show-disables action to quman - added a test mode for the command line quman, that allows the user to test quman using quman's user interface - renamed disable tag as disable id, as it's more meaningful (identfies a disable request) - made user errors more user-friendly (ie not polluted with call stack) work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093] --- cocluto/quman.py | 177 ++++++++++++++++++++++++++++++++++----------- cocluto/version.py | 2 +- test/test_quman.py | 2 +- 3 files changed, 135 insertions(+), 46 deletions(-) diff --git a/cocluto/quman.py b/cocluto/quman.py index be59e80..f30acbf 100755 --- a/cocluto/quman.py +++ b/cocluto/quman.py @@ -6,6 +6,7 @@ import logging import subprocess import argparse import re +import json from datetime import datetime from pathlib import Path from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb @@ -13,7 +14,7 @@ from cocluto.ClusterController.QstatParser import QstatParser from cocluto.ClusterController.JobsState import JobsState LogId = int # identifies a log entry in the database -DisableTag = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc. +DisableId = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc. QueueId = str # identifies the queue eg main.q QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr @@ -23,14 +24,20 @@ UserId = str # unix user id of a user eg graffy def is_queue_machine_id(s: str) -> bool: """returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise.""" # if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is - return re.match(r'[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\.]+$', s) is not None + return re.match(r'^[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\-\.]+$', s) is not None class QueuesStatus(): is_enabled: Dict[QueueMachineId, bool] - def __init__(self): + def __init__(self, queues_status_file_path: Path = None): self.is_enabled = {} + if queues_status_file_path is not None: + self.load_from_json(queues_status_file_path) + + def load_from_json(self, path: Path): + with open(path, 'r', encoding='utf-8') as f: + self.is_enabled = json.load(f) def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool): self.is_enabled[queue_machine] = is_enabled @@ -39,6 +46,10 @@ class QueuesStatus(): for queue_machine, is_enabled in self.is_enabled.items(): print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}") + def save_as_json(self, path: Path): + with open(path, 'w', encoding='utf-8') as f: + json.dump(self.is_enabled, f, indent=2) + class IGridEngine(ABC): @@ -63,7 +74,7 @@ class MockGridEngine(IGridEngine): def disable_queue_machine(self, queue_machine: QueueMachineId): print(f"Mock disable queue {queue_machine}") - assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status" + assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status {self.queues_status.is_enabled}" assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled" self.queues_status.is_enabled[queue_machine] = False @@ -86,8 +97,8 @@ class Sge(IGridEngine): # check that qmod command is available try: subprocess.run(["qmod", "-h"], check=True, capture_output=True) - except FileNotFoundError: - raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.") + except FileNotFoundError as exc: + raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.") from exc def run_qmod(self, args): """runs qmod with the given arguments.""" @@ -98,7 +109,7 @@ class Sge(IGridEngine): try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: - raise RuntimeError(f"qmod command failed: {e}") + raise RuntimeError(f"qmod command failed: {e}") from e def disable_queue_machine(self, queue_machine: QueueMachineId): self.run_qmod(["-d", queue_machine]) @@ -130,7 +141,7 @@ def init_db(db_backend: ISqlDatabaseBackend): SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'), SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'), SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'), - SqlTableField('disable_tag', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'), + SqlTableField('disable_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'), SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'), ] db_backend.create_table('log', fields) @@ -170,16 +181,16 @@ class DisableRequest: host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested queue_machines: List[QueueMachineId] # the names of the queue machines that were modified reason: str - disable_tag: DisableTag + disable_id: DisableId timestamp: datetime - def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_tag: DisableTag, timestamp: datetime): + def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_id: DisableId, timestamp: datetime): self.log_id = log_id self.user_id = user_id self.host_fqdn = host_fqdn self.queue_machines = queue_machines self.reason = reason - self.disable_tag = disable_tag + self.disable_id = disable_id self.timestamp = timestamp def as_dict(self) -> Dict[str, Any]: @@ -189,7 +200,7 @@ class DisableRequest: "host_fqdn": self.host_fqdn, "queue_machines": self.queue_machines, "reason": self.reason, - "disable_tag": self.disable_tag, + "disable_id": self.disable_id, "timestamp": self.timestamp.isoformat(), } @@ -204,20 +215,20 @@ class QueueManager: grid_engine = Sge() self.grid_engine = grid_engine - def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_tag: DisableTag, reason: str) -> LogId: + def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_id: DisableId, reason: str) -> LogId: assert isinstance(queue_machines, list) assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'" userid = subprocess.check_output(['whoami']).decode().strip() host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip() timestamp = datetime.now().isoformat() - sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_tag}', '{reason}');" + sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_id}', '{reason}');" self.db_backend.query(sql_query) # get the log id of the disable action that was just inserted log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0] return log_id def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]: - sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';" + sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_id, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';" results = self.db_backend.query(sql_query) disable_requests_as_dict = {} for row in results: @@ -227,19 +238,20 @@ class QueueManager: queue_machines = row[3].split(',') assert queue_machine == queue_machine, "All results should be for the same queue" reason = row[4] - disable_tag = row[5] + disable_id = row[5] timestamp = datetime.fromisoformat(row[6]) - disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_tag=disable_tag, timestamp=timestamp) + disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp) return disable_requests_as_dict - def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_disable: bool = True): - + def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True): + logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason) all_disable_requests = {} for queue_machine in queue_machines: all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine) disable_requests = all_disable_requests[queue_machine] for dr in disable_requests.values(): - assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first." + if dr.disable_id == disable_id: + raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.") for queue_machine in queue_machines: if perform_disable: @@ -248,26 +260,27 @@ class QueueManager: # queue is currently active, we can disable it self.grid_engine.disable_queue_machine(queue_machine) - disable_log_id = self.log_modification(queue_machines, "disable", disable_tag, reason) + disable_log_id = self.log_modification(queue_machines, "disable", disable_id, reason) for queue_machine in queue_machines: self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');") - def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_enable: bool = True): + def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_enable: bool = True): for queue_machine in queue_machines: disable_requests = self.get_disable_requests(queue_machine) dr_to_remove = None # the disable reason to remove for dr in disable_requests.values(): - if dr.disable_tag == disable_tag: + if dr.disable_id == disable_id: dr_to_remove = dr break - assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation." + if dr_to_remove is None: + raise RuntimeError(f"Disable id {disable_id} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation.") if perform_enable: if len(disable_requests) == 1: # queue is currently disabled and there is only one disable reason, we can enable it self.grid_engine.enable_queue_machine(queue_machine) - enable_log_id = self.log_modification(queue_machines, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable + enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason) # noqa: F841 pylint:disable=unused-variable for queue_machine in queue_machines: self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';") @@ -285,23 +298,24 @@ class QueueManager: for queue_machine, is_enabled in qs.is_enabled.items(): if queue_machine not in db_queues: # if the queue is not in the database, we add it - logging.warning(f"Queue {queue_machine} is not in the database, adding it.") + logging.warning("Queue %s is not in the database, adding it.", queue_machine) self.db_backend.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');") else: db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine disable_requests = self.get_disable_requests(queue_machine) if not is_enabled and len(disable_requests) == 0: - # queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_tag "unknown" and reason "synchronized with grid engine" + # queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine" self.request_queue_machines_deactivation([queue_machine], "quman-sync", "synchronized with grid engine", perform_disable=False) assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none." elif is_enabled and len(disable_requests) > 0: - # queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_tag "unknown" with reason "synchronized with grid engine" + # queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine" for dr in disable_requests.values(): - self.request_queue_machines_activation([queue_machine], dr.disable_tag, "synchronized with grid engine", perform_enable=False) + self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False) assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests." for queue_machine in db_queues: - logging.warning(f"Queue {queue_machine} is in the database but not in the grid engine.") + logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine) self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';") + self.db_backend.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';") def get_state_as_json(self) -> Dict[str, Any]: """returns the state of the queues as a json string.""" @@ -321,35 +335,110 @@ class QueueManager: return state def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]: + logging.debug("Getting queue machines for queue_machine identifier '%s'", queue_machine) + queue_machines = [] if is_queue_machine_id(queue_machine): - return [queue_machine] + queue_machines = [queue_machine] else: status = self.grid_engine.get_status() - return [q for q in status.is_enabled if q.startswith(queue_machine)] + queue_machines = [q for q in status.is_enabled if q.startswith(queue_machine)] + assert len(queue_machines) > 0, f"No queue machine found for queue identifier '{queue_machine}'" + logging.debug("Found queue machines %s for queue_machine identifier '%s'", queue_machines, queue_machine) + return queue_machines def main(): + parser = argparse.ArgumentParser( + description="qman: manage queue disable/enable requests with logging", + prog="quman", + epilog="Example usage:\n" + " quman add-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance'\n" + " quman remove-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance completed'\n" + " quman add-disable-request main.q --disable-id admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'\n" + " quman show-disable-requests main.q@node42\n" + " quman show-disable-requests\n", + formatter_class=argparse.RawDescriptionHelpFormatter + ) + subparsers = parser.add_subparsers(dest="action", help="Action to perform") + parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.") + + # add-disable action + add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue") + add_parser.add_argument("queue", help="Queue to disable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)") + add_parser.add_argument("--disable-id", required=True, help="id for the disable request (e.g., croconaus, manual.graffy, etc.)") + add_parser.add_argument("--reason", required=True, help="Reason for the disabling") + + # remove-disable action + remove_parser = subparsers.add_parser("remove-disable-request", help="removes a disable request on a queue") + remove_parser.add_argument("queue", help="Queue to enable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)") + remove_parser.add_argument("--disable-id", required=True, help="id of the disable request to remove") + remove_parser.add_argument("--reason", default="Manual removal", help="Reason for the removal") + + # show-disable-requests action + show_parser = subparsers.add_parser("show-disable-requests", help="Show disable requests") + show_parser.add_argument("queue", nargs="?", default=None, help="Optional: specific queue to show (if omitted, shows all)") + try: - parser = argparse.ArgumentParser(description="qmod wrapper to manage queue states with a counter and logging.", epilog="Example usage: quman d main.q --disable-tag admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'") - parser.add_argument("action", choices=["d", "e"], help="Action: d (deactivate) or e (activate)") - parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)") - parser.add_argument("--reason", required=True, help="Reason for the deactivation/activation") - parser.add_argument("--disable-tag", required=True, help="tag for the disable request (e.g., auto.croconaus, manual.graffy, etc.)") + args = parser.parse_args() + test_mode = args.test + + if test_mode: + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.") + + if args.action is None: + parser.print_help() + exit(1) + + if test_mode: + queues_status_file_path = Path('./quman_test/queues_status.json') + if queues_status_file_path.exists(): + logging.info("Loading queues status from %s", queues_status_file_path) + qs = QueuesStatus(queues_status_file_path) + else: + logging.info("No queues status file found at %s. Initializing with default queues status.", queues_status_file_path) + qs = QueuesStatus() + for node_id in range(40, 44): + qs.add_queue(f'main.q@alambix{node_id}.ipr.univ-rennes.fr', True) + qs.add_queue('gpuonly.q@alambix42', True) + grid_engine = MockGridEngine(qs) + else: + grid_engine = Sge() db_backend = create_db_backend() - quman = QueueManager(db_backend, Sge(dry_run=False)) # set dry_run to False to actually run qmod commands + quman = QueueManager(db_backend, grid_engine) quman.synchronize_with_grid_engine() - queue_machines = quman.get_queue_machines(args.queue) - if args.action == "d": - quman.request_queue_machines_deactivation(queue_machines, args.disable_tag, args.reason) - elif args.action == "e": - quman.request_queue_machines_activation(queue_machines, args.disable_tag, args.reason) + if args.action == "add-disable-request": + queue_machines = quman.get_queue_machines(args.queue) + quman.request_queue_machines_deactivation(queue_machines, args.disable_id, args.reason) + elif args.action == "remove-disable-request": + queue_machines = quman.get_queue_machines(args.queue) + quman.request_queue_machines_activation(queue_machines, args.disable_id, args.reason) + elif args.action == "show-disable-requests": + if args.queue is None: + # Show all disable requests + state = quman.get_state_as_json() + print(json.dumps(state, indent=2)) + else: + # Show disable requests for a specific queue + queue_machines = quman.get_queue_machines(args.queue) + state = {} + for queue_machine in queue_machines: + disable_requests = quman.get_disable_requests(queue_machine) + state[queue_machine] = { + "disable_requests": [dr.as_dict() for dr in disable_requests.values()] + } + print(json.dumps(state, indent=2)) + + if test_mode: + grid_engine.get_status().save_as_json(queues_status_file_path) + except RuntimeError as e: - sys.stderr.write(f"An error occurred: {e}\n") + sys.stderr.write(f"ERROR: {e}\n") exit(1) diff --git a/cocluto/version.py b/cocluto/version.py index cf9f82c..c99af16 100644 --- a/cocluto/version.py +++ b/cocluto/version.py @@ -1,4 +1,4 @@ -__version__ = '1.0.15' +__version__ = '1.0.16' class Version(object): diff --git a/test/test_quman.py b/test/test_quman.py index 31ef335..f3e3173 100644 --- a/test/test_quman.py +++ b/test/test_quman.py @@ -40,7 +40,7 @@ class QumanTestCase(unittest.TestCase): print('disable requests:') print(json.dumps(quman.get_state_as_json(), indent=2)) quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack') - with self.assertRaises(AssertionError): + with self.assertRaises(RuntimeError): # attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine) quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman') quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco')