cocluto v1.0.13 - improvements to quman

- the userid and the host fqdn of are now recorded
- found better names (more appropriate and easier to understand)
  - disable reason -> disable request
  - requester -> disable tag

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-04-02 19:31:31 +02:00
parent 12be70500b
commit bfe1eeb084
2 changed files with 85 additions and 52 deletions

View File

@ -10,8 +10,10 @@ from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
LogId = int # identifies a log entry in the database
RequesterId = str # identifies the queue enable/disable requester eg auto.croconaus, manual.graffy, etc.
DisableTag = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr
UserId = str # unix user id of a user eg graffy
class QueuesStatus():
@ -90,32 +92,37 @@ class Sge(IGridEngine):
# Parse the output to extract queue statuses
# This is a simplified example - you would need to parse the actual qstat output
queues_status = QueuesStatus()
jobs_state: JobsState = QstatParser.parseQstatOutput(process.stdout.decode())
queue_machines = jobs_state.getQueueMachines()
for queue_machine in queue_machines.itervalues():
qstat_parser = QstatParser()
jobs_state: JobsState = qstat_parser.parse_qstat_output(process.stdout.decode())
queue_machines = jobs_state.get_queue_machines()
for queue_machine in queue_machines.values():
queues_status.add_queue(queue_machine.get_name(), queue_machine.is_enabled())
return queues_status
def init_db(db_backend: ISqlDatabaseBackend):
# a database storing the log of actions (queue activation or deactivation)
if not db_backend.table_exists('log'):
fields = [
SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True),
SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'),
SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'),
SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'),
SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'),
SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'),
SqlTableField('requester_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the ID of the requester'),
SqlTableField('disable_tag', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
]
db_backend.create_table('log', fields)
if not db_backend.table_exists('state'):
# a database storing the current disable requests
if not db_backend.table_exists('disables'):
fields = [
SqlTableField('disable_reason_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
SqlTableField('queue_name', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue that was modified'),
]
db_backend.create_table('state', fields)
db_backend.create_table('disables', fields)
def create_db_backend() -> ISqlDatabaseBackend:
@ -131,19 +138,35 @@ def create_db_backend() -> ISqlDatabaseBackend:
return backend
class DisableReason:
db_id: int
class DisableRequest:
db_id: int # index of this disable request in the database
user_id: UserId # the id of the user who requested the disable action (unix user id)
host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested
queue_name: QueueMachineId # the name of the queue that was
reason: str
requester_id: RequesterId
disable_tag: DisableTag
timestamp: datetime
def __init__(self, log_id: int, queue_name: QueueMachineId, reason: str, requester_id: RequesterId, timestamp: datetime):
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_name: QueueMachineId, reason: str, disable_tag: DisableTag, timestamp: datetime):
self.log_id = log_id
self.user_id = user_id
self.host_fqdn = host_fqdn
self.queue_name = queue_name
self.reason = reason
self.requester_id = requester_id
self.disable_tag = disable_tag
self.timestamp = timestamp
def as_dict(self) -> Dict[str, Any]:
return {
"log_id": self.log_id,
"user_id": self.user_id,
"host_fqdn": self.host_fqdn,
"queue_name": self.queue_name,
"reason": self.reason,
"disable_tag": self.disable_tag,
"timestamp": self.timestamp.isoformat(),
}
class QueueManager:
db_backend: ISqlDatabaseBackend
@ -153,72 +176,83 @@ class QueueManager:
self.db_backend = db_backend
self.grid_engine = grid_engine
def log_modification(self, queue_name: str, action: str, requester_id: RequesterId, reason: str) -> LogId:
def log_modification(self, queue_name: str, action: str, disable_tag: DisableTag, reason: str) -> LogId:
assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'"
userid = subprocess.check_output(['whoami']).decode().strip()
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
timestamp = datetime.now().isoformat()
sql_query = f"INSERT INTO log (timestamp, queue_name, action, requester_id, reason) VALUES ('{timestamp}', '{queue_name}', '{action}', '{requester_id}', '{reason}');"
sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_name, action, disable_tag, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{queue_name}', '{action}', '{disable_tag}', '{reason}');"
self.db_backend.query(sql_query)
# get the log id of the disable action that was just inserted
log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0]
return log_id
def get_disable_reasons(self, queue_name: QueueMachineId) -> Dict[int, DisableReason]:
sql_query = f"SELECT log.id, log.queue_name, log.reason, log.requester_id, log.timestamp FROM log JOIN state ON log.id = state.disable_reason_id WHERE state.queue_name = '{queue_name}' AND log.action = 'disable';"
def get_disable_requests(self, queue_name: QueueMachineId) -> Dict[int, DisableRequest]:
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_name, log.reason, log.disable_tag, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_name = '{queue_name}' AND log.action = 'disable';"
results = self.db_backend.query(sql_query)
disable_requests_as_dict = {}
for row in results:
assert row[1] == queue_name, "All results should be for the same queue"
return {row[0]: DisableReason(log_id=row[0], queue_name=row[1], reason=row[2], requester_id=row[3], timestamp=datetime.fromisoformat(row[4])) for row in results}
log_id = row[0]
user_id = row[1]
host_fqdn = row[2]
queue_machine = row[3]
assert queue_machine == queue_name, "All results should be for the same queue"
reason = row[4]
disable_tag = row[5]
timestamp = datetime.fromisoformat(row[6])
disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_name=queue_name, reason=reason, disable_tag=disable_tag, timestamp=timestamp)
return disable_requests_as_dict
def request_queue_deactivation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_disable: bool = True):
def request_queue_deactivation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_disable: bool = True):
disable_reasons = self.get_disable_reasons(queue_name)
for dr in disable_reasons.values():
assert dr.requester_id != requester_id, f"Requester {requester_id} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first."
disable_requests = self.get_disable_requests(queue_name)
for dr in disable_requests.values():
assert dr.disable_tag != disable_tag, f"Disable tag {disable_tag} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first."
if perform_disable:
if len(disable_reasons) == 0:
if len(disable_requests) == 0:
# queue is currently active, we can disable it
self.grid_engine.disable_queue(queue_name)
disable_log_id = self.log_modification(queue_name, "disable", requester_id, reason)
self.db_backend.query(f"INSERT INTO state (disable_reason_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');")
disable_log_id = self.log_modification(queue_name, "disable", disable_tag, reason)
self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');")
def request_queue_activation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_enable: bool = True):
disable_reasons = self.get_disable_reasons(queue_name)
def request_queue_activation(self, queue_name: QueueMachineId, disable_tag: DisableTag, reason: str, perform_enable: bool = True):
disable_requests = self.get_disable_requests(queue_name)
dr_to_remove = None # the disable reason to remove
for dr in disable_reasons.values():
if dr.requester_id == requester_id:
for dr in disable_requests.values():
if dr.disable_tag == disable_tag:
dr_to_remove = dr
break
assert dr_to_remove is not None, f"Requester {requester_id} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation."
assert dr_to_remove is not None, f"Disable tag {disable_tag} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation."
if perform_enable:
if len(disable_reasons) == 1:
if len(disable_requests) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue(queue_name)
enable_log_id = self.log_modification(queue_name, "enable", requester_id, reason) # noqa: F841
self.db_backend.query(f"DELETE FROM state WHERE disable_reason_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';")
enable_log_id = self.log_modification(queue_name, "enable", disable_tag, reason) # noqa: F841 pylint:disable=unused-variable
self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';")
def synchronize_with_grid_engine(self):
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
qs = self.grid_engine.get_status()
for queue_name, is_enabled in qs.is_enabled.items():
disable_reasons = self.get_disable_reasons(queue_name)
if not is_enabled and len(disable_reasons) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with requester_id "unknown" and reason "synchronized with grid engine"
disable_requests = self.get_disable_requests(queue_name)
if not is_enabled and len(disable_requests) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_tag "unknown" and reason "synchronized with grid engine"
self.request_queue_deactivation(queue_name, "quman-sync", "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_reasons(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none."
elif is_enabled and len(disable_reasons) > 0:
# queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and requester_id "unknown" with reason "synchronized with grid engine"
for dr in disable_reasons.values():
self.request_queue_activation(queue_name, dr.requester_id, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_reasons(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_reasons(queue_name))} disable reasons."
assert len(self.get_disable_requests(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none."
elif is_enabled and len(disable_requests) > 0:
# queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and disable_tag "unknown" with reason "synchronized with grid engine"
for dr in disable_requests.values():
self.request_queue_activation(queue_name, dr.disable_tag, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_requests(queue_name))} disable reasons."
def get_state_as_json(self) -> Dict[str, Any]:
"""returns the state of the queues as a json string."""
# get the list of queue names from the state table in the database
sql_query = "SELECT DISTINCT queue_name FROM state;"
# get the list of queue names from the disables table in the database
sql_query = "SELECT DISTINCT queue_name FROM disables;"
results = self.db_backend.query(sql_query)
for row in results:
assert len(row) == 1, "Each row should have only one column (queue_name)"
@ -226,9 +260,9 @@ class QueueManager:
state = {}
for queue_name in queue_names:
disable_reasons = self.get_disable_reasons(queue_name)
disable_requests = self.get_disable_requests(queue_name)
state[queue_name] = {
"disable_reasons": [{"reason": dr.reason, "requester_id": dr.requester_id, "timestamp": dr.timestamp.isoformat()} for dr in disable_reasons.values()]
"disable_requests": [dr.as_dict() for dr in disable_requests.values()]
}
return state
@ -239,7 +273,7 @@ def main():
parser.add_argument("action", choices=["d", "e"], help="Action: d (deactivate) or e (activate)")
parser.add_argument("queue", help="Queue to modify (e.g., main.q@node42@univ-rennes.fr)")
parser.add_argument("--reason", required=True, help="Reason for the deactivation/activation")
parser.add_argument("--requester", required=True, help="ID of the requester")
parser.add_argument("--disable-tag", required=True, help="tag for the disable request (e.g., auto.croconaus, manual.graffy, etc.)")
args = parser.parse_args()
db_backend = create_db_backend()
@ -248,10 +282,9 @@ def main():
queue = args.queue
if args.action == "d":
qmod.request_queue_deactivation(queue, args.requester, args.reason)
qmod.request_queue_deactivation(queue, args.disable_tag, args.reason)
elif args.action == "e":
qmod.request_queue_activation(queue, args.requester, args.reason)
qmod.db_backend.dump(Path('./quman_test/log.sql'))
qmod.request_queue_activation(queue, args.disable_tag, args.reason)
if __name__ == "__main__":

View File

@ -1,4 +1,4 @@
__version__ = '1.0.12'
__version__ = '1.0.13'
class Version(object):