cocluto v1.0.14 - added support to full queues to quman

Whith this feature, it's now possible to disable an entire queue (and not only one queue machine), with

```sh
quman d main.q --disable-tag admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'
```
-  also replaced the term queue name with queue machine for clarity, now that we have both QueueMachineId and QueueId

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-04-03 11:58:47 +02:00
parent bfe1eeb084
commit 4cc541d9c3
3 changed files with 140 additions and 85 deletions

View File

@ -1,8 +1,10 @@
#!/usr/bin/env python3
from abc import ABC, abstractmethod
from typing import Dict, Any
from typing import List, Dict, Any, Union
import logging
import subprocess
import argparse
import re
from datetime import datetime
from pathlib import Path
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb
@ -11,33 +13,40 @@ from cocluto.ClusterController.JobsState import JobsState
LogId = int # identifies a log entry in the database
DisableTag = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
QueueId = str # identifies the queue eg main.q
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr
UserId = str # unix user id of a user eg graffy
def is_queue_machine_id(s: str) -> bool:
"""returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise."""
# if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is
return re.match(r'[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\.]+$', s) is not None
class QueuesStatus():
is_enabled: Dict[QueueMachineId, bool]
def __init__(self):
self.is_enabled = {}
def add_queue(self, queue_name: QueueMachineId, is_enabled: bool):
self.is_enabled[queue_name] = is_enabled
def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool):
self.is_enabled[queue_machine] = is_enabled
def print(self):
for queue_name, is_enabled in self.is_enabled.items():
print(f"{queue_name}: {'enabled' if is_enabled else 'disabled'}")
for queue_machine, is_enabled in self.is_enabled.items():
print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}")
class IGridEngine(ABC):
@abstractmethod
def disable_queue(self, queue_name: QueueMachineId):
def disable_queue_machine(self, queue_machine: QueueMachineId):
pass
@abstractmethod
def enable_queue(self, queue_name: QueueMachineId):
def enable_queue_machine(self, queue_machine: QueueMachineId):
pass
@abstractmethod
@ -51,17 +60,17 @@ class MockGridEngine(IGridEngine):
def __init__(self, queues_status: QueuesStatus):
self.queues_status = queues_status
def disable_queue(self, queue_name: QueueMachineId):
print(f"Mock disable queue {queue_name}")
assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status"
assert self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already disabled"
self.queues_status.is_enabled[queue_name] = False
def disable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock disable queue {queue_machine}")
assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status"
assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled"
self.queues_status.is_enabled[queue_machine] = False
def enable_queue(self, queue_name: QueueMachineId):
print(f"Mock enable queue {queue_name}")
assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status"
assert not self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already enabled"
self.queues_status.is_enabled[queue_name] = True
def enable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock enable queue {queue_machine}")
assert queue_machine in self.queues_status.is_enabled, f"Queue machine {queue_machine} not found in queues status"
assert not self.queues_status.is_enabled[queue_machine], f"Queue machine {queue_machine} is already enabled"
self.queues_status.is_enabled[queue_machine] = True
def get_status(self) -> QueuesStatus:
return self.queues_status
@ -81,11 +90,11 @@ class Sge(IGridEngine):
else:
subprocess.run(cmd, check=True)
def disable_queue(self, queue_name: QueueMachineId):
self.run_qmod(["-d", queue_name])
def disable_queue_machine(self, queue_machine: QueueMachineId):
self.run_qmod(["-d", queue_machine])
def enable_queue(self, queue_name: QueueMachineId):
self.run_qmod(["-e", queue_name])
def enable_queue_machine(self, queue_machine: QueueMachineId):
self.run_qmod(["-e", queue_machine])
def get_status(self) -> QueuesStatus:
process = subprocess.run(['qstat', '-f', '-u', '*'], check=True, capture_output=True)
@ -102,28 +111,35 @@ class Sge(IGridEngine):
def init_db(db_backend: ISqlDatabaseBackend):
# a database storing the log of actions (queue activation or deactivation)
# a table storing the log of actions (queue activation or deactivation)
if not db_backend.table_exists('log'):
fields = [
SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True),
SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'),
SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'),
SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'),
SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'),
SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'),
SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'),
SqlTableField('disable_tag', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
]
db_backend.create_table('log', fields)
# a database storing the current disable requests
# a table storing the current disable requests
if not db_backend.table_exists('disables'):
fields = [
SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'),
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue machine that was disabled'),
]
db_backend.create_table('disables', fields)
# a table storing the current queues
if not db_backend.table_exists('queues'):
fields = [
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue'),
]
db_backend.create_table('queues', fields)
def create_db_backend() -> ISqlDatabaseBackend:
# db_server_fqdn = 'alambix-master.ipr.univ-rennes.fr'
@ -142,16 +158,16 @@ class DisableRequest:
db_id: int # index of this disable request in the database
user_id: UserId # the id of the user who requested the disable action (unix user id)
host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested
queue_name: QueueMachineId # the name of the queue that was
queue_machines: List[QueueMachineId] # the names of the queue machines that were modified
reason: str
disable_tag: DisableTag
timestamp: datetime
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_name: QueueMachineId, reason: str, disable_tag: DisableTag, timestamp: datetime):
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_tag: DisableTag, timestamp: datetime):
self.log_id = log_id
self.user_id = user_id
self.host_fqdn = host_fqdn
self.queue_name = queue_name
self.queue_machines = queue_machines
self.reason = reason
self.disable_tag = disable_tag
self.timestamp = timestamp
@ -161,7 +177,7 @@ class DisableRequest:
"log_id": self.log_id,
"user_id": self.user_id,
"host_fqdn": self.host_fqdn,
"queue_name": self.queue_name,
"queue_machines": self.queue_machines,
"reason": self.reason,
"disable_tag": self.disable_tag,
"timestamp": self.timestamp.isoformat(),
@ -176,115 +192,149 @@ class QueueManager:
self.db_backend = db_backend
self.grid_engine = grid_engine
def log_modification(self, queue_name: str, action: str, disable_tag: DisableTag, reason: str) -> LogId:
def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_tag: DisableTag, reason: str) -> LogId:
assert isinstance(queue_machines, list)
assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'"
userid = subprocess.check_output(['whoami']).decode().strip()
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
timestamp = datetime.now().isoformat()
sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_name, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{queue_name}', '{action}', '{disable_tag}', '{reason}');"
sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_tag}', '{reason}');"
self.db_backend.query(sql_query)
# get the log id of the disable action that was just inserted
log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0]
return log_id
def get_disable_requests(self, queue_name: QueueMachineId) -> Dict[int, DisableRequest]:
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_name, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_name = '{queue_name}' AND log.action = 'disable';"
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';"
results = self.db_backend.query(sql_query)
disable_requests_as_dict = {}
for row in results:
log_id = row[0]
user_id = row[1]
host_fqdn = row[2]
queue_machine = row[3]
assert queue_machine == queue_name, "All results should be for the same queue"
queue_machines = row[3].split(',')
assert queue_machine == queue_machine, "All results should be for the same queue"
reason = row[4]
disable_tag = row[5]
timestamp = datetime.fromisoformat(row[6])
disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_name=queue_name, reason=reason, disable_tag=disable_tag, timestamp=timestamp)
disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_tag=disable_tag, timestamp=timestamp)
return disable_requests_as_dict
def request_queue_deactivation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_disable: bool = True):
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_disable: bool = True):
disable_requests = self.get_disable_requests(queue_name)
for dr in disable_requests.values():
assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first."
all_disable_requests = {}
for queue_machine in queue_machines:
all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine)
disable_requests = all_disable_requests[queue_machine]
for dr in disable_requests.values():
assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first."
if perform_disable:
if len(disable_requests) == 0:
# queue is currently active, we can disable it
self.grid_engine.disable_queue(queue_name)
for queue_machine in queue_machines:
if perform_disable:
disable_requests = all_disable_requests[queue_machine]
if len(disable_requests) == 0:
# queue is currently active, we can disable it
self.grid_engine.disable_queue_machine(queue_machine)
disable_log_id = self.log_modification(queue_name, "disable", disable_tag, reason)
self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');")
disable_log_id = self.log_modification(queue_machines, "disable", disable_tag, reason)
for queue_machine in queue_machines:
self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
def request_queue_activation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_enable: bool = True):
disable_requests = self.get_disable_requests(queue_name)
dr_to_remove = None # the disable reason to remove
for dr in disable_requests.values():
if dr.disable_tag == disable_tag:
dr_to_remove = dr
break
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_enable: bool = True):
for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine)
dr_to_remove = None # the disable reason to remove
for dr in disable_requests.values():
if dr.disable_tag == disable_tag:
dr_to_remove = dr
break
assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation."
assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation."
if perform_enable:
if len(disable_requests) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue(queue_name)
enable_log_id = self.log_modification(queue_name, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable
self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';")
if perform_enable:
if len(disable_requests) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue_machine(queue_machine)
enable_log_id = self.log_modification(queue_machines, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable
for queue_machine in queue_machines:
self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
def synchronize_with_grid_engine(self):
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
qs = self.grid_engine.get_status()
for queue_name, is_enabled in qs.is_enabled.items():
disable_requests = self.get_disable_requests(queue_name)
db_queues = set()
sql_query = "SELECT queue_machine FROM queues;"
results = self.db_backend.query(sql_query)
for row in results:
assert len(row) == 1, "Each row should have only one column (queue_machine)"
db_queues.add(row[0])
for queue_machine, is_enabled in qs.is_enabled.items():
if queue_machine not in db_queues:
# if the queue is not in the database, we add it
logging.warning(f"Queue {queue_machine} is not in the database, adding it.")
self.db_backend.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
else:
db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine
disable_requests = self.get_disable_requests(queue_machine)
if not is_enabled and len(disable_requests) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_tag "unknown" and reason "synchronized with grid engine"
self.request_queue_deactivation(queue_name, "quman-sync", "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_requests(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none."
self.request_queue_machines_deactivation([queue_machine], "quman-sync", "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
elif is_enabled and len(disable_requests) > 0:
# queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and disable_tag "unknown" with reason "synchronized with grid engine"
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_tag "unknown" with reason "synchronized with grid engine"
for dr in disable_requests.values():
self.request_queue_activation(queue_name, dr.disable_tag, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_requests(queue_name))} disable reasons."
self.request_queue_machines_activation([queue_machine], dr.disable_tag, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
for queue_machine in db_queues:
logging.warning(f"Queue {queue_machine} is in the database but not in the grid engine.")
self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
def get_state_as_json(self) -> Dict[str, Any]:
"""returns the state of the queues as a json string."""
# get the list of queue names from the disables table in the database
sql_query = "SELECT DISTINCT queue_name FROM disables;"
sql_query = "SELECT DISTINCT queue_machine FROM disables;"
results = self.db_backend.query(sql_query)
for row in results:
assert len(row) == 1, "Each row should have only one column (queue_name)"
queue_names = [row[0] for row in results]
assert len(row) == 1, "Each row should have only one column (queue_machine)"
queue_machines = [row[0] for row in results]
state = {}
for queue_name in queue_names:
disable_requests = self.get_disable_requests(queue_name)
state[queue_name] = {
for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine)
state[queue_machine] = {
"disable_requests": [dr.as_dict() for dr in disable_requests.values()]
}
return state
def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]:
if is_queue_machine_id(queue_machine):
return [queue_machine]
else:
status = self.grid_engine.get_status()
return [q for q in status.is_enabled if q.startswith(queue_machine)]
def main():
parser = argparse.ArgumentParser(description="qmod wrapper to manage queue states with a counter and logging.")
parser = argparse.ArgumentParser(description="qmod wrapper to manage queue states with a counter and logging.", epilog="Example usage: quman d main.q --disable-tag admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'")
parser.add_argument("action", choices=["d", "e"], help="Action: d (deactivate) or e (activate)")
parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr)")
parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
parser.add_argument("--reason", required=True, help="Reason for the deactivation/activation")
parser.add_argument("--disable-tag", required=True, help="tag for the disable request (e.g., auto.croconaus, manual.graffy, etc.)")
args = parser.parse_args()
db_backend = create_db_backend()
qmod = QueueManager(db_backend, Sge(dry_run=False)) # set dry_run to False to actually run qmod commands
quman = QueueManager(db_backend, Sge(dry_run=False)) # set dry_run to False to actually run qmod commands
queue = args.queue
quman.synchronize_with_grid_engine()
queue_machines = quman.get_queue_machines(args.queue)
if args.action == "d":
qmod.request_queue_deactivation(queue, args.disable_tag, args.reason)
quman.request_queue_machines_deactivation(queue_machines, args.disable_tag, args.reason)
elif args.action == "e":
qmod.request_queue_activation(queue, args.disable_tag, args.reason)
quman.request_queue_machines_activation(queue_machines, args.disable_tag, args.reason)
if __name__ == "__main__":

View File

@ -1,4 +1,4 @@
__version__ = '1.0.13'
__version__ = '1.0.14'
class Version(object):

View File

@ -27,7 +27,7 @@ class QumanTestCase(unittest.TestCase):
qs.add_queue(f'main.q@alambix{node_id}', True)
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
grid_engine.disable_queue('main.q@alambix42') # simulate that the queue is already disabled)
grid_engine.disable_queue_machine('main.q@alambix42') # simulate that the queue is already disabled)
quman = QueueManager(db_backend, grid_engine)
print('queues state:')
grid_engine.queues_status.print()
@ -39,12 +39,17 @@ class QumanTestCase(unittest.TestCase):
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2))
quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
with self.assertRaises(AssertionError):
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'because I want to test quman')
quman.request_queue_deactivation('main.q@alambix42', 'croconaus.maco-update', 'disabled to update maco')
quman.request_queue_activation('main.q@alambix42', 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman')
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco')
quman.request_queue_machines_activation(['main.q@alambix42'], 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
main_queue_machines = quman.get_queue_machines('main.q')
quman.request_queue_machines_deactivation(main_queue_machines, 'sysadmin.graffy.bug4242', 'disable all cluster to prepare complete shutdown')
main_queue_machines = quman.get_queue_machines('main.q')
quman.request_queue_machines_activation(main_queue_machines, 'sysadmin.graffy.bug4242', 'electricity is back, reactivating all cluster')
db_backend.dump(Path('./quman_test/quman_dump.sql'))
print('queues state:')
grid_engine.queues_status.print()