diff --git a/cocluto/quman.py b/cocluto/quman.py index 29d1269..e34e29a 100755 --- a/cocluto/quman.py +++ b/cocluto/quman.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 from abc import ABC, abstractmethod -from typing import Dict, Any +from typing import List, Dict, Any, Union +import logging import subprocess import argparse +import re from datetime import datetime from pathlib import Path from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb @@ -11,33 +13,40 @@ from cocluto.ClusterController.JobsState import JobsState LogId = int # identifies a log entry in the database DisableTag = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc. +QueueId = str # identifies the queue eg main.q QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr UserId = str # unix user id of a user eg graffy +def is_queue_machine_id(s: str) -> bool: + """returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise.""" + # if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is + return re.match(r'[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\.]+$', s) is not None + + class QueuesStatus(): is_enabled: Dict[QueueMachineId, bool] def __init__(self): self.is_enabled = {} - def add_queue(self, queue_name: QueueMachineId, is_enabled: bool): - self.is_enabled[queue_name] = is_enabled + def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool): + self.is_enabled[queue_machine] = is_enabled def print(self): - for queue_name, is_enabled in self.is_enabled.items(): - print(f"{queue_name}: {'enabled' if is_enabled else 'disabled'}") + for queue_machine, is_enabled in self.is_enabled.items(): + print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}") class IGridEngine(ABC): @abstractmethod - def disable_queue(self, queue_name: QueueMachineId): + def disable_queue_machine(self, queue_machine: QueueMachineId): pass @abstractmethod - def enable_queue(self, queue_name: QueueMachineId): + def enable_queue_machine(self, queue_machine: QueueMachineId): pass @abstractmethod @@ -51,17 +60,17 @@ class MockGridEngine(IGridEngine): def __init__(self, queues_status: QueuesStatus): self.queues_status = queues_status - def disable_queue(self, queue_name: QueueMachineId): - print(f"Mock disable queue {queue_name}") - assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status" - assert self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already disabled" - self.queues_status.is_enabled[queue_name] = False + def disable_queue_machine(self, queue_machine: QueueMachineId): + print(f"Mock disable queue {queue_machine}") + assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status" + assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled" + self.queues_status.is_enabled[queue_machine] = False - def enable_queue(self, queue_name: QueueMachineId): - print(f"Mock enable queue {queue_name}") - assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status" - assert not self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already enabled" - self.queues_status.is_enabled[queue_name] = True + def enable_queue_machine(self, queue_machine: QueueMachineId): + print(f"Mock enable queue {queue_machine}") + assert queue_machine in self.queues_status.is_enabled, f"Queue machine {queue_machine} not found in queues status" + assert not self.queues_status.is_enabled[queue_machine], f"Queue machine {queue_machine} is already enabled" + self.queues_status.is_enabled[queue_machine] = True def get_status(self) -> QueuesStatus: return self.queues_status @@ -81,11 +90,11 @@ class Sge(IGridEngine): else: subprocess.run(cmd, check=True) - def disable_queue(self, queue_name: QueueMachineId): - self.run_qmod(["-d", queue_name]) + def disable_queue_machine(self, queue_machine: QueueMachineId): + self.run_qmod(["-d", queue_machine]) - def enable_queue(self, queue_name: QueueMachineId): - self.run_qmod(["-e", queue_name]) + def enable_queue_machine(self, queue_machine: QueueMachineId): + self.run_qmod(["-e", queue_machine]) def get_status(self) -> QueuesStatus: process = subprocess.run(['qstat', '-f', '-u', '*'], check=True, capture_output=True) @@ -102,28 +111,35 @@ class Sge(IGridEngine): def init_db(db_backend: ISqlDatabaseBackend): - # a database storing the log of actions (queue activation or deactivation) + # a table storing the log of actions (queue activation or deactivation) if not db_backend.table_exists('log'): fields = [ SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True), SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'), SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'), SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'), - SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'), + SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'), SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'), SqlTableField('disable_tag', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'), SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'), ] db_backend.create_table('log', fields) - # a database storing the current disable requests + # a table storing the current disable requests if not db_backend.table_exists('disables'): fields = [ SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'), - SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'), + SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue machine that was disabled'), ] db_backend.create_table('disables', fields) + # a table storing the current queues + if not db_backend.table_exists('queues'): + fields = [ + SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue'), + ] + db_backend.create_table('queues', fields) + def create_db_backend() -> ISqlDatabaseBackend: # db_server_fqdn = 'alambix-master.ipr.univ-rennes.fr' @@ -142,16 +158,16 @@ class DisableRequest: db_id: int # index of this disable request in the database user_id: UserId # the id of the user who requested the disable action (unix user id) host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested - queue_name: QueueMachineId # the name of the queue that was + queue_machines: List[QueueMachineId] # the names of the queue machines that were modified reason: str disable_tag: DisableTag timestamp: datetime - def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_name: QueueMachineId, reason: str, disable_tag: DisableTag, timestamp: datetime): + def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_tag: DisableTag, timestamp: datetime): self.log_id = log_id self.user_id = user_id self.host_fqdn = host_fqdn - self.queue_name = queue_name + self.queue_machines = queue_machines self.reason = reason self.disable_tag = disable_tag self.timestamp = timestamp @@ -161,7 +177,7 @@ class DisableRequest: "log_id": self.log_id, "user_id": self.user_id, "host_fqdn": self.host_fqdn, - "queue_name": self.queue_name, + "queue_machines": self.queue_machines, "reason": self.reason, "disable_tag": self.disable_tag, "timestamp": self.timestamp.isoformat(), @@ -176,115 +192,149 @@ class QueueManager: self.db_backend = db_backend self.grid_engine = grid_engine - def log_modification(self, queue_name: str, action: str, disable_tag: DisableTag, reason: str) -> LogId: + def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_tag: DisableTag, reason: str) -> LogId: + assert isinstance(queue_machines, list) assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'" userid = subprocess.check_output(['whoami']).decode().strip() host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip() timestamp = datetime.now().isoformat() - sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_name, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{queue_name}', '{action}', '{disable_tag}', '{reason}');" + sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_tag}', '{reason}');" self.db_backend.query(sql_query) # get the log id of the disable action that was just inserted log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0] return log_id - def get_disable_requests(self, queue_name: QueueMachineId) -> Dict[int, DisableRequest]: - sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_name, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_name = '{queue_name}' AND log.action = 'disable';" + def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]: + sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';" results = self.db_backend.query(sql_query) disable_requests_as_dict = {} for row in results: log_id = row[0] user_id = row[1] host_fqdn = row[2] - queue_machine = row[3] - assert queue_machine == queue_name, "All results should be for the same queue" + queue_machines = row[3].split(',') + assert queue_machine == queue_machine, "All results should be for the same queue" reason = row[4] disable_tag = row[5] timestamp = datetime.fromisoformat(row[6]) - disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_name=queue_name, reason=reason, disable_tag=disable_tag, timestamp=timestamp) + disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_tag=disable_tag, timestamp=timestamp) return disable_requests_as_dict - def request_queue_deactivation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_disable: bool = True): + def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_disable: bool = True): - disable_requests = self.get_disable_requests(queue_name) - for dr in disable_requests.values(): - assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first." + all_disable_requests = {} + for queue_machine in queue_machines: + all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine) + disable_requests = all_disable_requests[queue_machine] + for dr in disable_requests.values(): + assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first." - if perform_disable: - if len(disable_requests) == 0: - # queue is currently active, we can disable it - self.grid_engine.disable_queue(queue_name) + for queue_machine in queue_machines: + if perform_disable: + disable_requests = all_disable_requests[queue_machine] + if len(disable_requests) == 0: + # queue is currently active, we can disable it + self.grid_engine.disable_queue_machine(queue_machine) - disable_log_id = self.log_modification(queue_name, "disable", disable_tag, reason) - self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');") + disable_log_id = self.log_modification(queue_machines, "disable", disable_tag, reason) + for queue_machine in queue_machines: + self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');") - def request_queue_activation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_enable: bool = True): - disable_requests = self.get_disable_requests(queue_name) - dr_to_remove = None # the disable reason to remove - for dr in disable_requests.values(): - if dr.disable_tag == disable_tag: - dr_to_remove = dr - break + def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_enable: bool = True): + for queue_machine in queue_machines: + disable_requests = self.get_disable_requests(queue_machine) + dr_to_remove = None # the disable reason to remove + for dr in disable_requests.values(): + if dr.disable_tag == disable_tag: + dr_to_remove = dr + break - assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation." + assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation." - if perform_enable: - if len(disable_requests) == 1: - # queue is currently disabled and there is only one disable reason, we can enable it - self.grid_engine.enable_queue(queue_name) - enable_log_id = self.log_modification(queue_name, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable - self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';") + if perform_enable: + if len(disable_requests) == 1: + # queue is currently disabled and there is only one disable reason, we can enable it + self.grid_engine.enable_queue_machine(queue_machine) + enable_log_id = self.log_modification(queue_machines, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable + for queue_machine in queue_machines: + self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';") def synchronize_with_grid_engine(self): """synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat.""" qs = self.grid_engine.get_status() - for queue_name, is_enabled in qs.is_enabled.items(): - disable_requests = self.get_disable_requests(queue_name) + + db_queues = set() + sql_query = "SELECT queue_machine FROM queues;" + results = self.db_backend.query(sql_query) + for row in results: + assert len(row) == 1, "Each row should have only one column (queue_machine)" + db_queues.add(row[0]) + + for queue_machine, is_enabled in qs.is_enabled.items(): + if queue_machine not in db_queues: + # if the queue is not in the database, we add it + logging.warning(f"Queue {queue_machine} is not in the database, adding it.") + self.db_backend.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');") + else: + db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine + disable_requests = self.get_disable_requests(queue_machine) if not is_enabled and len(disable_requests) == 0: # queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_tag "unknown" and reason "synchronized with grid engine" - self.request_queue_deactivation(queue_name, "quman-sync", "synchronized with grid engine", perform_disable=False) - assert len(self.get_disable_requests(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none." + self.request_queue_machines_deactivation([queue_machine], "quman-sync", "synchronized with grid engine", perform_disable=False) + assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none." elif is_enabled and len(disable_requests) > 0: - # queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and disable_tag "unknown" with reason "synchronized with grid engine" + # queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_tag "unknown" with reason "synchronized with grid engine" for dr in disable_requests.values(): - self.request_queue_activation(queue_name, dr.disable_tag, "synchronized with grid engine", perform_enable=False) - assert len(self.get_disable_requests(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_requests(queue_name))} disable reasons." + self.request_queue_machines_activation([queue_machine], dr.disable_tag, "synchronized with grid engine", perform_enable=False) + assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests." + for queue_machine in db_queues: + logging.warning(f"Queue {queue_machine} is in the database but not in the grid engine.") + self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';") def get_state_as_json(self) -> Dict[str, Any]: """returns the state of the queues as a json string.""" # get the list of queue names from the disables table in the database - sql_query = "SELECT DISTINCT queue_name FROM disables;" + sql_query = "SELECT DISTINCT queue_machine FROM disables;" results = self.db_backend.query(sql_query) for row in results: - assert len(row) == 1, "Each row should have only one column (queue_name)" - queue_names = [row[0] for row in results] + assert len(row) == 1, "Each row should have only one column (queue_machine)" + queue_machines = [row[0] for row in results] state = {} - for queue_name in queue_names: - disable_requests = self.get_disable_requests(queue_name) - state[queue_name] = { + for queue_machine in queue_machines: + disable_requests = self.get_disable_requests(queue_machine) + state[queue_machine] = { "disable_requests": [dr.as_dict() for dr in disable_requests.values()] } return state + def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]: + if is_queue_machine_id(queue_machine): + return [queue_machine] + else: + status = self.grid_engine.get_status() + return [q for q in status.is_enabled if q.startswith(queue_machine)] + def main(): - parser = argparse.ArgumentParser(description="qmod wrapper to manage queue states with a counter and logging.") + parser = argparse.ArgumentParser(description="qmod wrapper to manage queue states with a counter and logging.", epilog="Example usage: quman d main.q --disable-tag admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'") parser.add_argument("action", choices=["d", "e"], help="Action: d (deactivate) or e (activate)") - parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr)") + parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)") parser.add_argument("--reason", required=True, help="Reason for the deactivation/activation") parser.add_argument("--disable-tag", required=True, help="tag for the disable request (e.g., auto.croconaus, manual.graffy, etc.)") args = parser.parse_args() db_backend = create_db_backend() - qmod = QueueManager(db_backend, Sge(dry_run=False)) # set dry_run to False to actually run qmod commands + quman = QueueManager(db_backend, Sge(dry_run=False)) # set dry_run to False to actually run qmod commands - queue = args.queue + quman.synchronize_with_grid_engine() + queue_machines = quman.get_queue_machines(args.queue) if args.action == "d": - qmod.request_queue_deactivation(queue, args.disable_tag, args.reason) + quman.request_queue_machines_deactivation(queue_machines, args.disable_tag, args.reason) elif args.action == "e": - qmod.request_queue_activation(queue, args.disable_tag, args.reason) + quman.request_queue_machines_activation(queue_machines, args.disable_tag, args.reason) if __name__ == "__main__": diff --git a/cocluto/version.py b/cocluto/version.py index bf84868..a3afd0e 100644 --- a/cocluto/version.py +++ b/cocluto/version.py @@ -1,4 +1,4 @@ -__version__ = '1.0.13' +__version__ = '1.0.14' class Version(object): diff --git a/test/test_quman.py b/test/test_quman.py index acfbd8a..31ef335 100644 --- a/test/test_quman.py +++ b/test/test_quman.py @@ -27,7 +27,7 @@ class QumanTestCase(unittest.TestCase): qs.add_queue(f'main.q@alambix{node_id}', True) qs.add_queue('gpuonly.q@alambix42', True) grid_engine = MockGridEngine(qs) - grid_engine.disable_queue('main.q@alambix42') # simulate that the queue is already disabled) + grid_engine.disable_queue_machine('main.q@alambix42') # simulate that the queue is already disabled) quman = QueueManager(db_backend, grid_engine) print('queues state:') grid_engine.queues_status.print() @@ -39,12 +39,17 @@ class QumanTestCase(unittest.TestCase): grid_engine.queues_status.print() print('disable requests:') print(json.dumps(quman.get_state_as_json(), indent=2)) - quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'disabled to move the alambix42 to another rack') + quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack') with self.assertRaises(AssertionError): # attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine) - quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'because I want to test quman') - quman.request_queue_deactivation('main.q@alambix42', 'croconaus.maco-update', 'disabled to update maco') - quman.request_queue_activation('main.q@alambix42', 'sysadmin.graffy', 'alambix42 has been moved to a new rack') + quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman') + quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco') + quman.request_queue_machines_activation(['main.q@alambix42'], 'sysadmin.graffy', 'alambix42 has been moved to a new rack') + main_queue_machines = quman.get_queue_machines('main.q') + quman.request_queue_machines_deactivation(main_queue_machines, 'sysadmin.graffy.bug4242', 'disable all cluster to prepare complete shutdown') + main_queue_machines = quman.get_queue_machines('main.q') + quman.request_queue_machines_activation(main_queue_machines, 'sysadmin.graffy.bug4242', 'electricity is back, reactivating all cluster') + db_backend.dump(Path('./quman_test/quman_dump.sql')) print('queues state:') grid_engine.queues_status.print()