cocluto v1.0.16 - overhauld quman arguments parsing

- quman's argument parsing now uses subparsers to allow for more actions than enable and disable
- added the show-disables action to quman
- added a test mode for the command line quman, that allows the user to test quman using quman's user interface
- renamed disable tag as disable id, as it's more meaningful (identfies a disable request)
- made user errors more user-friendly (ie not polluted with call stack)

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-04-03 17:01:56 +02:00
parent 25afd32504
commit 04adbbc684
3 changed files with 135 additions and 46 deletions

View File

@ -6,6 +6,7 @@ import logging
import subprocess
import argparse
import re
import json
from datetime import datetime
from pathlib import Path
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb
@ -13,7 +14,7 @@ from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
LogId = int # identifies a log entry in the database
DisableTag = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
DisableId = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
QueueId = str # identifies the queue eg main.q
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr
@ -23,14 +24,20 @@ UserId = str # unix user id of a user eg graffy
def is_queue_machine_id(s: str) -> bool:
"""returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise."""
# if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is
return re.match(r'[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\.]+$', s) is not None
return re.match(r'^[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\-\.]+$', s) is not None
class QueuesStatus():
is_enabled: Dict[QueueMachineId, bool]
def __init__(self):
def __init__(self, queues_status_file_path: Path = None):
self.is_enabled = {}
if queues_status_file_path is not None:
self.load_from_json(queues_status_file_path)
def load_from_json(self, path: Path):
with open(path, 'r', encoding='utf-8') as f:
self.is_enabled = json.load(f)
def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool):
self.is_enabled[queue_machine] = is_enabled
@ -39,6 +46,10 @@ class QueuesStatus():
for queue_machine, is_enabled in self.is_enabled.items():
print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}")
def save_as_json(self, path: Path):
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.is_enabled, f, indent=2)
class IGridEngine(ABC):
@ -63,7 +74,7 @@ class MockGridEngine(IGridEngine):
def disable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock disable queue {queue_machine}")
assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status"
assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status {self.queues_status.is_enabled}"
assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled"
self.queues_status.is_enabled[queue_machine] = False
@ -86,8 +97,8 @@ class Sge(IGridEngine):
# check that qmod command is available
try:
subprocess.run(["qmod", "-h"], check=True, capture_output=True)
except FileNotFoundError:
raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.")
except FileNotFoundError as exc:
raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.") from exc
def run_qmod(self, args):
"""runs qmod with the given arguments."""
@ -98,7 +109,7 @@ class Sge(IGridEngine):
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"qmod command failed: {e}")
raise RuntimeError(f"qmod command failed: {e}") from e
def disable_queue_machine(self, queue_machine: QueueMachineId):
self.run_qmod(["-d", queue_machine])
@ -130,7 +141,7 @@ def init_db(db_backend: ISqlDatabaseBackend):
SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'),
SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'),
SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'),
SqlTableField('disable_tag', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
SqlTableField('disable_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
]
db_backend.create_table('log', fields)
@ -170,16 +181,16 @@ class DisableRequest:
host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested
queue_machines: List[QueueMachineId] # the names of the queue machines that were modified
reason: str
disable_tag: DisableTag
disable_id: DisableId
timestamp: datetime
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_tag: DisableTag, timestamp: datetime):
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_id: DisableId, timestamp: datetime):
self.log_id = log_id
self.user_id = user_id
self.host_fqdn = host_fqdn
self.queue_machines = queue_machines
self.reason = reason
self.disable_tag = disable_tag
self.disable_id = disable_id
self.timestamp = timestamp
def as_dict(self) -> Dict[str, Any]:
@ -189,7 +200,7 @@ class DisableRequest:
"host_fqdn": self.host_fqdn,
"queue_machines": self.queue_machines,
"reason": self.reason,
"disable_tag": self.disable_tag,
"disable_id": self.disable_id,
"timestamp": self.timestamp.isoformat(),
}
@ -204,20 +215,20 @@ class QueueManager:
grid_engine = Sge()
self.grid_engine = grid_engine
def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_tag: DisableTag, reason: str) -> LogId:
def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_id: DisableId, reason: str) -> LogId:
assert isinstance(queue_machines, list)
assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'"
userid = subprocess.check_output(['whoami']).decode().strip()
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
timestamp = datetime.now().isoformat()
sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_tag}', '{reason}');"
sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_id}', '{reason}');"
self.db_backend.query(sql_query)
# get the log id of the disable action that was just inserted
log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0]
return log_id
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';"
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_id, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';"
results = self.db_backend.query(sql_query)
disable_requests_as_dict = {}
for row in results:
@ -227,19 +238,20 @@ class QueueManager:
queue_machines = row[3].split(',')
assert queue_machine == queue_machine, "All results should be for the same queue"
reason = row[4]
disable_tag = row[5]
disable_id = row[5]
timestamp = datetime.fromisoformat(row[6])
disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_tag=disable_tag, timestamp=timestamp)
disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp)
return disable_requests_as_dict
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_disable: bool = True):
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True):
logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason)
all_disable_requests = {}
for queue_machine in queue_machines:
all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine)
disable_requests = all_disable_requests[queue_machine]
for dr in disable_requests.values():
assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first."
if dr.disable_id == disable_id:
raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.")
for queue_machine in queue_machines:
if perform_disable:
@ -248,26 +260,27 @@ class QueueManager:
# queue is currently active, we can disable it
self.grid_engine.disable_queue_machine(queue_machine)
disable_log_id = self.log_modification(queue_machines, "disable", disable_tag, reason)
disable_log_id = self.log_modification(queue_machines, "disable", disable_id, reason)
for queue_machine in queue_machines:
self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_tag: DisableTag, reason: str, perform_enable: bool = True):
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_enable: bool = True):
for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine)
dr_to_remove = None # the disable reason to remove
for dr in disable_requests.values():
if dr.disable_tag == disable_tag:
if dr.disable_id == disable_id:
dr_to_remove = dr
break
assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation."
if dr_to_remove is None:
raise RuntimeError(f"Disable id {disable_id} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation.")
if perform_enable:
if len(disable_requests) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue_machine(queue_machine)
enable_log_id = self.log_modification(queue_machines, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason) # noqa: F841 pylint:disable=unused-variable
for queue_machine in queue_machines:
self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
@ -285,23 +298,24 @@ class QueueManager:
for queue_machine, is_enabled in qs.is_enabled.items():
if queue_machine not in db_queues:
# if the queue is not in the database, we add it
logging.warning(f"Queue {queue_machine} is not in the database, adding it.")
logging.warning("Queue %s is not in the database, adding it.", queue_machine)
self.db_backend.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
else:
db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine
disable_requests = self.get_disable_requests(queue_machine)
if not is_enabled and len(disable_requests) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_tag "unknown" and reason "synchronized with grid engine"
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine"
self.request_queue_machines_deactivation([queue_machine], "quman-sync", "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
elif is_enabled and len(disable_requests) > 0:
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_tag "unknown" with reason "synchronized with grid engine"
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine"
for dr in disable_requests.values():
self.request_queue_machines_activation([queue_machine], dr.disable_tag, "synchronized with grid engine", perform_enable=False)
self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
for queue_machine in db_queues:
logging.warning(f"Queue {queue_machine} is in the database but not in the grid engine.")
logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine)
self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
self.db_backend.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
def get_state_as_json(self) -> Dict[str, Any]:
"""returns the state of the queues as a json string."""
@ -321,35 +335,110 @@ class QueueManager:
return state
def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]:
logging.debug("Getting queue machines for queue_machine identifier '%s'", queue_machine)
queue_machines = []
if is_queue_machine_id(queue_machine):
return [queue_machine]
queue_machines = [queue_machine]
else:
status = self.grid_engine.get_status()
return [q for q in status.is_enabled if q.startswith(queue_machine)]
queue_machines = [q for q in status.is_enabled if q.startswith(queue_machine)]
assert len(queue_machines) > 0, f"No queue machine found for queue identifier '{queue_machine}'"
logging.debug("Found queue machines %s for queue_machine identifier '%s'", queue_machines, queue_machine)
return queue_machines
def main():
parser = argparse.ArgumentParser(
description="qman: manage queue disable/enable requests with logging",
prog="quman",
epilog="Example usage:\n"
" quman add-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance'\n"
" quman remove-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance completed'\n"
" quman add-disable-request main.q --disable-id admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'\n"
" quman show-disable-requests main.q@node42\n"
" quman show-disable-requests\n",
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.")
# add-disable action
add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue")
add_parser.add_argument("queue", help="Queue to disable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
add_parser.add_argument("--disable-id", required=True, help="id for the disable request (e.g., croconaus, manual.graffy, etc.)")
add_parser.add_argument("--reason", required=True, help="Reason for the disabling")
# remove-disable action
remove_parser = subparsers.add_parser("remove-disable-request", help="removes a disable request on a queue")
remove_parser.add_argument("queue", help="Queue to enable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
remove_parser.add_argument("--disable-id", required=True, help="id of the disable request to remove")
remove_parser.add_argument("--reason", default="Manual removal", help="Reason for the removal")
# show-disable-requests action
show_parser = subparsers.add_parser("show-disable-requests", help="Show disable requests")
show_parser.add_argument("queue", nargs="?", default=None, help="Optional: specific queue to show (if omitted, shows all)")
try:
parser = argparse.ArgumentParser(description="qmod wrapper to manage queue states with a counter and logging.", epilog="Example usage: quman d main.q --disable-tag admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'")
parser.add_argument("action", choices=["d", "e"], help="Action: d (deactivate) or e (activate)")
parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
parser.add_argument("--reason", required=True, help="Reason for the deactivation/activation")
parser.add_argument("--disable-tag", required=True, help="tag for the disable request (e.g., auto.croconaus, manual.graffy, etc.)")
args = parser.parse_args()
test_mode = args.test
if test_mode:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.")
if args.action is None:
parser.print_help()
exit(1)
if test_mode:
queues_status_file_path = Path('./quman_test/queues_status.json')
if queues_status_file_path.exists():
logging.info("Loading queues status from %s", queues_status_file_path)
qs = QueuesStatus(queues_status_file_path)
else:
logging.info("No queues status file found at %s. Initializing with default queues status.", queues_status_file_path)
qs = QueuesStatus()
for node_id in range(40, 44):
qs.add_queue(f'main.q@alambix{node_id}.ipr.univ-rennes.fr', True)
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
else:
grid_engine = Sge()
db_backend = create_db_backend()
quman = QueueManager(db_backend, Sge(dry_run=False)) # set dry_run to False to actually run qmod commands
quman = QueueManager(db_backend, grid_engine)
quman.synchronize_with_grid_engine()
queue_machines = quman.get_queue_machines(args.queue)
if args.action == "d":
quman.request_queue_machines_deactivation(queue_machines, args.disable_tag, args.reason)
elif args.action == "e":
quman.request_queue_machines_activation(queue_machines, args.disable_tag, args.reason)
if args.action == "add-disable-request":
queue_machines = quman.get_queue_machines(args.queue)
quman.request_queue_machines_deactivation(queue_machines, args.disable_id, args.reason)
elif args.action == "remove-disable-request":
queue_machines = quman.get_queue_machines(args.queue)
quman.request_queue_machines_activation(queue_machines, args.disable_id, args.reason)
elif args.action == "show-disable-requests":
if args.queue is None:
# Show all disable requests
state = quman.get_state_as_json()
print(json.dumps(state, indent=2))
else:
# Show disable requests for a specific queue
queue_machines = quman.get_queue_machines(args.queue)
state = {}
for queue_machine in queue_machines:
disable_requests = quman.get_disable_requests(queue_machine)
state[queue_machine] = {
"disable_requests": [dr.as_dict() for dr in disable_requests.values()]
}
print(json.dumps(state, indent=2))
if test_mode:
grid_engine.get_status().save_as_json(queues_status_file_path)
except RuntimeError as e:
sys.stderr.write(f"An error occurred: {e}\n")
sys.stderr.write(f"ERROR: {e}\n")
exit(1)

View File

@ -1,4 +1,4 @@
__version__ = '1.0.15'
__version__ = '1.0.16'
class Version(object):

View File

@ -40,7 +40,7 @@ class QumanTestCase(unittest.TestCase):
print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2))
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
with self.assertRaises(AssertionError):
with self.assertRaises(RuntimeError):
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman')
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco')