- fixed bug in request_queue_activation, which caused it to always enable the queue, even if there are other disables
- added a synchronization mechanism to quman which patches the database to ensure the database is coherent with the current activation of the queues

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-04-01 19:08:06 +02:00
parent f5dce0bf10
commit 0f0d5f800e
3 changed files with 121 additions and 11 deletions

View File

@ -1,17 +1,33 @@
#!/usr/bin/env python3
from abc import ABC, abstractmethod
from typing import Dict
from typing import Dict, Any
import subprocess
import argparse
from datetime import datetime
from pathlib import Path
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb
from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
LogId = int # identifies a log entry in the database
RequesterId = str # identifies the queue enable/disable requester eg auto.croconaus, manual.graffy, etc.
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
class QueuesStatus():
is_enabled: Dict[QueueMachineId, bool]
def __init__(self):
self.is_enabled = {}
def add_queue(self, queue_name: QueueMachineId, is_enabled: bool):
self.is_enabled[queue_name] = is_enabled
def print(self):
for queue_name, is_enabled in self.is_enabled.items():
print(f"{queue_name}: {'enabled' if is_enabled else 'disabled'}")
class IGridEngine(ABC):
@abstractmethod
@ -22,6 +38,32 @@ class IGridEngine(ABC):
def enable_queue(self, queue_name: QueueMachineId):
pass
@abstractmethod
def get_status(self) -> QueuesStatus:
pass
class MockGridEngine(IGridEngine):
queues_status: QueuesStatus
def __init__(self, queues_status: QueuesStatus):
self.queues_status = queues_status
def disable_queue(self, queue_name: QueueMachineId):
print(f"Mock disable queue {queue_name}")
assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status"
assert self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already disabled"
self.queues_status.is_enabled[queue_name] = False
def enable_queue(self, queue_name: QueueMachineId):
print(f"Mock enable queue {queue_name}")
assert queue_name in self.queues_status.is_enabled, f"Queue {queue_name} not found in queues status"
assert not self.queues_status.is_enabled[queue_name], f"Queue {queue_name} is already enabled"
self.queues_status.is_enabled[queue_name] = True
def get_status(self) -> QueuesStatus:
return self.queues_status
class Sge(IGridEngine):
dry_run: bool
@ -43,6 +85,18 @@ class Sge(IGridEngine):
def enable_queue(self, queue_name: QueueMachineId):
self.run_qmod(["-e", queue_name])
def get_status(self) -> QueuesStatus:
process = subprocess.run(['qstat', '-f', '-u', '*'], check=True, capture_output=True)
# Parse the output to extract queue statuses
# This is a simplified example - you would need to parse the actual qstat output
queues_status = QueuesStatus()
jobs_state: JobsState = QstatParser.parseQstatOutput(process.stdout.decode())
queue_machines = jobs_state.getQueueMachines()
for queue_machine in queue_machines.itervalues():
queues_status.add_queue(queue_machine.get_name(), queue_machine.is_enabled())
return queues_status
def init_db(db_backend: ISqlDatabaseBackend):
if not db_backend.table_exists('log'):
@ -115,12 +169,13 @@ class QueueManager:
assert row[1] == queue_name, "All results should be for the same queue"
return {row[0]: DisableReason(log_id=row[0], queue_name=row[1], reason=row[2], requester_id=row[3], timestamp=datetime.fromisoformat(row[4])) for row in results}
def request_queue_deactivation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str):
def request_queue_deactivation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_disable: bool = True):
disable_reasons = self.get_disable_reasons(queue_name)
for dr in disable_reasons.values():
assert dr.requester_id != requester_id, f"Requester {requester_id} has already requested deactivation of queue {queue_name} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first."
if perform_disable:
if len(disable_reasons) == 0:
# queue is currently active, we can disable it
self.grid_engine.disable_queue(queue_name)
@ -128,7 +183,7 @@ class QueueManager:
disable_log_id = self.log_modification(queue_name, "disable", requester_id, reason)
self.db_backend.query(f"INSERT INTO state (disable_reason_id, queue_name) VALUES ({disable_log_id}, '{queue_name}');")
def request_queue_activation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str):
def request_queue_activation(self, queue_name: QueueMachineId, requester_id: RequesterId, reason: str, perform_enable: bool = True):
disable_reasons = self.get_disable_reasons(queue_name)
dr_to_remove = None # the disable reason to remove
for dr in disable_reasons.values():
@ -138,10 +193,45 @@ class QueueManager:
assert dr_to_remove is not None, f"Requester {requester_id} has not requested deactivation of queue {queue_name}. Cannot request activation without a prior deactivation."
if perform_enable:
if len(disable_reasons) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue(queue_name)
enable_log_id = self.log_modification(queue_name, "enable", requester_id, reason) # noqa: F841
self.db_backend.query(f"DELETE FROM state WHERE disable_reason_id = {dr_to_remove.log_id} AND queue_name = '{queue_name}';")
def synchronize_with_grid_engine(self):
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
qs = self.grid_engine.get_status()
for queue_name, is_enabled in qs.is_enabled.items():
disable_reasons = self.get_disable_reasons(queue_name)
if not is_enabled and len(disable_reasons) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with requester_id "unknown" and reason "synchronized with grid engine"
self.request_queue_deactivation(queue_name, "quman-sync", "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_reasons(queue_name)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_name} but there are none."
elif is_enabled and len(disable_reasons) > 0:
# queue is enabled in the grid engine but there are disable reasons in the database, we remove all disable reasons for this queue and requester_id "unknown" with reason "synchronized with grid engine"
for dr in disable_reasons.values():
self.request_queue_activation(queue_name, dr.requester_id, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_reasons(queue_name)) == 0, f"After synchronization, there should be no disable reasons for queue {queue_name} but there are still {len(self.get_disable_reasons(queue_name))} disable reasons."
def get_state_as_json(self) -> Dict[str, Any]:
"""returns the state of the queues as a json string."""
# get the list of queue names from the state table in the database
sql_query = "SELECT DISTINCT queue_name FROM state;"
results = self.db_backend.query(sql_query)
for row in results:
assert len(row) == 1, "Each row should have only one column (queue_name)"
queue_names = [row[0] for row in results]
state = {}
for queue_name in queue_names:
disable_reasons = self.get_disable_reasons(queue_name)
state[queue_name] = {
"disable_reasons": [{"reason": dr.reason, "requester_id": dr.requester_id, "timestamp": dr.timestamp.isoformat()} for dr in disable_reasons.values()]
}
return state
def main():

View File

@ -1,4 +1,4 @@
__version__ = '1.0.11'
__version__ = '1.0.12'
class Version(object):

View File

@ -1,9 +1,10 @@
from pathlib import Path
import json
import unittest
import logging
# from cocluto import ClusterController
from cocluto.SimpaDbUtil import SqliteDb
from cocluto.quman import QueueManager, init_db, Sge
from cocluto.quman import QueueManager, init_db, MockGridEngine, QueuesStatus
class QumanTestCase(unittest.TestCase):
@ -20,15 +21,34 @@ class QumanTestCase(unittest.TestCase):
db_path.unlink()
db_backend = SqliteDb(db_path)
init_db(db_backend)
quman = QueueManager(db_backend, Sge(dry_run=True)) # set dry_run to True to not actually run qmod commands
qs = QueuesStatus()
for node_id in range(40, 44):
qs.add_queue(f'main.q@alambix{node_id}', True)
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
grid_engine.disable_queue('main.q@alambix42') # simulate that the queue is already disabled)
quman = QueueManager(db_backend, grid_engine)
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2))
print('synchronizing with grid engine...')
quman.synchronize_with_grid_engine()
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2))
quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
with self.assertRaises(AssertionError):
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'because I want to test quman')
quman.request_queue_deactivation('main.q@alambix42', 'croconaus.maco-update', 'disabled to update maco')
quman.request_queue_activation('main.q@alambix42', 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
# self.assertIsInstance(job_state, JobsState)
db_backend.dump(Path('./quman_test/quman_dump.sql'))
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2))
if __name__ == '__main__':