cocluto v1.0.21 - added the option --db-def to quman

- this option allows the quman to work with user configurable database system as a backend. the default uses the database for alambix cluster
- also improved the output of show-disable-requests

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-04-10 17:58:26 +02:00
parent c80e3cd382
commit 6974f51221
2 changed files with 49 additions and 16 deletions

View File

@ -9,7 +9,7 @@ import re
import json
from datetime import datetime
from pathlib import Path
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField, SshAccessedMysqlDb
from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
@ -74,13 +74,15 @@ class MockGridEngine(IGridEngine):
def disable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock disable queue {queue_machine}")
assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status {self.queues_status.is_enabled}"
if queue_machine not in self.queues_status.is_enabled:
raise RuntimeError(f"Queue {queue_machine} not found in queues {list(self.queues_status.is_enabled.keys())}")
assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled"
self.queues_status.is_enabled[queue_machine] = False
def enable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock enable queue {queue_machine}")
assert queue_machine in self.queues_status.is_enabled, f"Queue machine {queue_machine} not found in queues status"
if queue_machine not in self.queues_status.is_enabled:
raise RuntimeError(f"Queue machine {queue_machine} not found in queues {list(self.queues_status.is_enabled.keys())}")
assert not self.queues_status.is_enabled[queue_machine], f"Queue machine {queue_machine} is already enabled"
self.queues_status.is_enabled[queue_machine] = True
@ -132,6 +134,7 @@ class Sge(IGridEngine):
def init_db(db_backend: ISqlDatabaseBackend):
# a table storing the log of actions (queue activation or deactivation)
if not db_backend.table_exists('log'):
fields = [
@ -162,14 +165,27 @@ def init_db(db_backend: ISqlDatabaseBackend):
db_backend.create_table('queues', fields)
def create_db_backend() -> ISqlDatabaseBackend:
# db_server_fqdn = 'alambix-master.ipr.univ-rennes.fr'
# db_user = 'qumanw'
# db_name = 'quman'
# ssh_user = 'qumanw'
def create_db_backend(db_def: str) -> ISqlDatabaseBackend:
# db_def: eg '{"type": "mysql", "args": {"server": "db.server.com", "user": "db_user", "name": "quman_db", "ssh_user": "ssh_user"}}' or '{"type": "sqlite", "args": {"database": "./quman.db"}}'
db_def = json.loads(db_def)
db_type = db_def['type']
db_params = db_def['params']
if db_type == 'mysql_via_ssh':
sql_server_fqdn = db_params['sql_server_fqdn'] # the fully qualified domain name of the mysql server hosting the database eg 'alambix-master.ipr.univ-rennes.fr'
db_user = db_params['db_user'] # eg 'qumanw'
db_name = db_params['db_name'] # the name of the database, eg 'quman'
ssh_user = db_params['ssh_user'] # the ssh user which has the privileges to access the mysql database, eg 'qumanw'
backend = SshAccessedMysqlDb(sql_server_fqdn, db_user, db_name, ssh_user)
command = f'ssh "{ssh_user}@{sql_server_fqdn}" "hostname"'
completed_process = subprocess.run(command, shell=True, check=False, capture_output=True)
if completed_process.returncode != 0:
raise RuntimeError(f"Failed to connect to the mysql server via ssh with command '{command}'. Please check the connection parameters and make sure that the ssh user has access to the server. Error message: {completed_process.stderr.decode()}")
elif db_type == 'sqlite':
db_path = Path(db_params['sqlite_file_path']) # eg./quman_test/quman.sqlite
backend = SqliteDb(db_path)
else:
raise ValueError("Unsupported database type: %s" % db_type)
# backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
backend = SqliteDb(Path('./quman_test/quman.sqlite'))
init_db(backend)
return backend
@ -364,22 +380,39 @@ class QueueManager:
return queue_machines
class CustomHelpFormatter(
argparse.ArgumentDefaultsHelpFormatter, # to get the default values in the help message
argparse.RawDescriptionHelpFormatter # to get the newlines in the description and epilog to be properly formatted in the help message
):
pass
def main():
parser = argparse.ArgumentParser(
description="qman: manage queue disable/enable requests with logging",
prog="quman",
epilog="Example usage:\n"
epilog="Example usages:\n"
" quman add-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance'\n"
" quman remove-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance completed'\n"
" quman add-disable-request main.q --disable-id admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'\n"
" quman show-disable-requests main.q@node42\n"
" quman show-disable-requests\n",
formatter_class=argparse.RawDescriptionHelpFormatter
" quman show-disable-requests\n"
" quman --db-def='{\"type\": \"sqlite\", \"params\": {\"sqlite_file_path\": \"/tmp/quman.sqlite\"} }' show-disable-requests\n"
" quman --db-def='{\"type\": \"mysql_via_ssh\", \"params\": {\"sql_server_fqdn\": \"alambix-master.ipr.univ-rennes.fr\", \"db_user\": \"qumanw\", \"db_name\": \"quman\", \"ssh_user\": \"qumanw\"} }' show-disable-requests\n",
formatter_class=CustomHelpFormatter
)
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.")
default_db_def = {
"type": "mysql_via_ssh",
"params": {
"sql_server_fqdn": "alambix-master.ipr.univ-rennes.fr",
"db_user": "qumanw",
"db_name": "quman",
"ssh_user": "qumanw"}}
parser.add_argument("--json", action="store_true", help="Output results in JSON format.")
parser.add_argument("--db-def", type=str, default=json.dumps(default_db_def), help="the definition in json format of the database storing the disable requests.")
# add-disable action
add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue")
@ -430,7 +463,7 @@ def main():
grid_engine = MockGridEngine(qs)
else:
grid_engine = Sge()
db_backend = create_db_backend()
db_backend = create_db_backend(args.db_def)
quman = QueueManager(db_backend, grid_engine)
quman.synchronize_with_grid_engine()
@ -458,7 +491,7 @@ def main():
for queue_machine, disable_requests in state.state.items():
print(f"Queue machine: {queue_machine}")
for dr in disable_requests:
print(f" Disable request by {dr.user_id} on {dr.host_fqdn} at {dr.timestamp.isoformat()} with disable id '{dr.disable_id}' for reason: {dr.reason}")
print(f" Disable request {dr.log_id} by {dr.user_id} on {dr.host_fqdn} at {dr.timestamp.isoformat()} with disable id '{dr.disable_id}' for reason: {dr.reason}")
elif args.action == "update-db":
quman.synchronize_with_grid_engine()

View File

@ -1,4 +1,4 @@
__version__ = '1.0.20'
__version__ = '1.0.21'
class Version(object):