From bfe1eeb0846465cb6da0c2e50290ccf221be67d0 Mon Sep 17 00:00:00 2001 From: Guillaume Raffy Date: Thu, 2 Apr 2026 19:31:31 +0200 Subject: [PATCH] cocluto v1.0.13 - improvements to quman - the userid and the host fqdn of are now recorded - found better names (more appropriate and easier to understand) - disable reason -> disable request - requester -> disable tag work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093] --- cocluto/quman.py | 135 ++++++++++++++++++++++++++++----------------- cocluto/version.py | 2 +- 2 files changed, 85 insertions(+), 52 deletions(-) diff --git a/cocluto/quman.py b/cocluto/quman.py index 08d5f81..29d1269 100755 --- a/cocluto/quman.py +++ b/cocluto/quman.py @@ -10,8 +10,10 @@ from cocluto.ClusterController.QstatParser import QstatParser from cocluto.ClusterController.JobsState import JobsState LogId = int # identifies a log entry in the database -RequesterId = str # identifies the queue enable/disable requester eg auto.croconaus, manual.graffy, etc. +DisableTag = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc. QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr +HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr +UserId = str # unix user id of a user eg graffy class QueuesStatus(): @@ -90,32 +92,37 @@ class Sge(IGridEngine): # Parse the output to extract queue statuses # This is a simplified example - you would need to parse the actual qstat output queues_status = QueuesStatus() - jobs_state: JobsState = QstatParser.parseQstatOutput(process.stdout.decode()) - queue_machines = jobs_state.getQueueMachines() - for queue_machine in queue_machines.itervalues(): + qstat_parser = QstatParser() + jobs_state: JobsState = qstat_parser.parse_qstat_output(process.stdout.decode()) + queue_machines = jobs_state.get_queue_machines() + for queue_machine in queue_machines.values(): queues_status.add_queue(queue_machine.get_name(), queue_machine.is_enabled()) return queues_status def init_db(db_backend: ISqlDatabaseBackend): + # a database storing the log of actions (queue activation or deactivation) if not db_backend.table_exists('log'): fields = [ SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True), SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'), + SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'), + SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'), SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'), SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'), - SqlTableField('requester_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the ID of the requester'), + SqlTableField('disable_tag', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'), SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'), ] db_backend.create_table('log', fields) - if not db_backend.table_exists('state'): + # a database storing the current disable requests + if not db_backend.table_exists('disables'): fields = [ - SqlTableField('disable_reason_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'), + SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'), SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'), ] - db_backend.create_table('state', fields) + db_backend.create_table('disables', fields) def create_db_backend() -> ISqlDatabaseBackend: @@ -131,19 +138,35 @@ def create_db_backend() -> ISqlDatabaseBackend: return backend -class DisableReason: - db_id: int +class DisableRequest: + db_id: int # index of this disable request in the database + user_id: UserId # the id of the user who requested the disable action (unix user id) + host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested + queue_name: QueueMachineId # the name of the queue that was reason: str - requester_id: RequesterId + disable_tag: DisableTag timestamp: datetime - def __init__(self, log_id: int, queue_name: QueueMachineId, reason: str, requester_id: RequesterId, timestamp: datetime): + def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_name: QueueMachineId, reason: str, disable_tag: DisableTag, timestamp: datetime): self.log_id = log_id + self.user_id = user_id + self.host_fqdn = host_fqdn self.queue_name = queue_name self.reason = reason - self.requester_id = requester_id + self.disable_tag = disable_tag self.timestamp = timestamp + def as_dict(self) -> Dict[str, Any]: + return { + "log_id": self.log_id, + "user_id": self.user_id, + "host_fqdn": self.host_fqdn, + "queue_name": self.queue_name, + "reason": self.reason, + "disable_tag": self.disable_tag, + "timestamp": self.timestamp.isoformat(), + } + class QueueManager: db_backend: ISqlDatabaseBackend @@ -153,72 +176,83 @@ class QueueManager: self.db_backend = db_backend self.grid_engine = grid_engine - def log_modification(self, queue_name: str, action: str, requester_id: RequesterId, reason: str) -> LogId: + def log_modification(self, queue_name: str, action: str, disable_tag: DisableTag, reason: str) -> LogId: assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'" + userid = subprocess.check_output(['whoami']).decode().strip() + host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip() timestamp = datetime.now().isoformat() - sql_query = f"INSERT INTO log (timestamp, queue_name, action, requester_id, reason) VALUES ('{timestamp}', '{queue_name}', '{action}', '{requester_id}', '{reason}');" + sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_name, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{queue_name}', '{action}', '{disable_tag}', '{reason}');" self.db_backend.query(sql_query) # get the log id of the disable action that was just inserted log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0] return log_id - def get_disable_reasons(self, queue_name: QueueMachineId) -> Dict[int, DisableReason]: - sql_query = f"SELECT log.id, log.queue_name, log.reason, log.requester_id, log.timestamp FROM log JOIN state ON log.id = state.disable_reason_id WHERE state.queue_name = '{queue_name}' AND log.action = 'disable';" + def get_disable_requests(self, queue_name: QueueMachineId) -> Dict[int, DisableRequest]: + sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_name, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_name = '{queue_name}' AND log.action = 'disable';" results = self.db_backend.query(sql_query) + disable_requests_as_dict = {} for row in results: - assert row[1] == queue_name, "All results should be for the same queue" - return {row[0]: DisableReason(log_id=row[0], queue_name=row[1], reason=row[2], requester_id=row[3], timestamp=datetime.fromisoformat(row[4])) for row in results} + log_id = row[0] + user_id = row[1] + host_fqdn = row[2] + queue_machine = row[3] + assert queue_machine == queue_name, "All results should be for the same queue" + reason = row[4] + disable_tag = row[5] + timestamp = datetime.fromisoformat(row[6]) + disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_name=queue_name, reason=reason, disable_tag=disable_tag, timestamp=timestamp) + return disable_requests_as_dict - def request_queue_deactivation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_disable: bool = True): + def request_queue_deactivation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_disable: bool = True): - disable_reasons = self.get_disable_reasons(queue_name) - for dr in disable_reasons.values(): - assert dr.requester_id != requester_id, f"Requester {requester_id} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first." + disable_requests = self.get_disable_requests(queue_name) + for dr in disable_requests.values(): + assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first." if perform_disable: - if len(disable_reasons) == 0: + if len(disable_requests) == 0: # queue is currently active, we can disable it self.grid_engine.disable_queue(queue_name) - disable_log_id = self.log_modification(queue_name, "disable", requester_id, reason) - self.db_backend.query(f"INSERT INTO state (disable_reason_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');") + disable_log_id = self.log_modification(queue_name, "disable", disable_tag, reason) + self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');") - def request_queue_activation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_enable: bool = True): - disable_reasons = self.get_disable_reasons(queue_name) + def request_queue_activation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_enable: bool = True): + disable_requests = self.get_disable_requests(queue_name) dr_to_remove = None # the disable reason to remove - for dr in disable_reasons.values(): - if dr.requester_id == requester_id: + for dr in disable_requests.values(): + if dr.disable_tag == disable_tag: dr_to_remove = dr break - assert dr_to_remove is not None, f"Requester {requester_id} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation." + assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation." if perform_enable: - if len(disable_reasons) == 1: + if len(disable_requests) == 1: # queue is currently disabled and there is only one disable reason, we can enable it self.grid_engine.enable_queue(queue_name) - enable_log_id = self.log_modification(queue_name, "enable", requester_id, reason) # noqa: F841 - self.db_backend.query(f"DELETE FROM state WHERE disable_reason_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';") + enable_log_id = self.log_modification(queue_name, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable + self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';") def synchronize_with_grid_engine(self): """synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat.""" qs = self.grid_engine.get_status() for queue_name, is_enabled in qs.is_enabled.items(): - disable_reasons = self.get_disable_reasons(queue_name) - if not is_enabled and len(disable_reasons) == 0: - # queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with requester_id "unknown" and reason "synchronized with grid engine" + disable_requests = self.get_disable_requests(queue_name) + if not is_enabled and len(disable_requests) == 0: + # queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_tag "unknown" and reason "synchronized with grid engine" self.request_queue_deactivation(queue_name, "quman-sync", "synchronized with grid engine", perform_disable=False) - assert len(self.get_disable_reasons(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none." - elif is_enabled and len(disable_reasons) > 0: - # queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and requester_id "unknown" with reason "synchronized with grid engine" - for dr in disable_reasons.values(): - self.request_queue_activation(queue_name, dr.requester_id, "synchronized with grid engine", perform_enable=False) - assert len(self.get_disable_reasons(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_reasons(queue_name))} disable reasons." + assert len(self.get_disable_requests(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none." + elif is_enabled and len(disable_requests) > 0: + # queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and disable_tag "unknown" with reason "synchronized with grid engine" + for dr in disable_requests.values(): + self.request_queue_activation(queue_name, dr.disable_tag, "synchronized with grid engine", perform_enable=False) + assert len(self.get_disable_requests(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_requests(queue_name))} disable reasons." def get_state_as_json(self) -> Dict[str, Any]: """returns the state of the queues as a json string.""" - # get the list of queue names from the state table in the database - sql_query = "SELECT DISTINCT queue_name FROM state;" + # get the list of queue names from the disables table in the database + sql_query = "SELECT DISTINCT queue_name FROM disables;" results = self.db_backend.query(sql_query) for row in results: assert len(row) == 1, "Each row should have only one column (queue_name)" @@ -226,9 +260,9 @@ class QueueManager: state = {} for queue_name in queue_names: - disable_reasons = self.get_disable_reasons(queue_name) + disable_requests = self.get_disable_requests(queue_name) state[queue_name] = { - "disable_reasons": [{"reason": dr.reason, "requester_id": dr.requester_id, "timestamp": dr.timestamp.isoformat()} for dr in disable_reasons.values()] + "disable_requests": [dr.as_dict() for dr in disable_requests.values()] } return state @@ -239,7 +273,7 @@ def main(): parser.add_argument("action", choices=["d", "e"], help="Action: d (deactivate) or e (activate)") parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr)") parser.add_argument("--reason", required=True, help="Reason for the deactivation/activation") - parser.add_argument("--requester", required=True, help="ID of the requester") + parser.add_argument("--disable-tag", required=True, help="tag for the disable request (e.g., auto.croconaus, manual.graffy, etc.)") args = parser.parse_args() db_backend = create_db_backend() @@ -248,10 +282,9 @@ def main(): queue = args.queue if args.action == "d": - qmod.request_queue_deactivation(queue, args.requester, args.reason) + qmod.request_queue_deactivation(queue, args.disable_tag, args.reason) elif args.action == "e": - qmod.request_queue_activation(queue, args.requester, args.reason) - qmod.db_backend.dump(Path('./quman_test/log.sql')) + qmod.request_queue_activation(queue, args.disable_tag, args.reason) if __name__ == "__main__": diff --git a/cocluto/version.py b/cocluto/version.py index 2fe35b4..bf84868 100644 --- a/cocluto/version.py +++ b/cocluto/version.py @@ -1,4 +1,4 @@ -__version__ = '1.0.12' +__version__ = '1.0.13' class Version(object):