cocluto v1.0.17 - added json option to quman so that the user can choose to get results in machine friendly (json) or human friendly format

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-04-03 18:08:58 +02:00
parent 04adbbc684
commit f64e31b420
3 changed files with 49 additions and 26 deletions

View File

@ -205,6 +205,25 @@ class DisableRequest:
} }
class QueueDisableState:
state: Dict[QueueMachineId, List[DisableRequest]]
def __init__(self):
self.state = {}
def set_queue_machine_state(self, queue_machine: QueueMachineId, disable_requests: List[DisableRequest]):
assert all(isinstance(dr, DisableRequest) for dr in disable_requests)
self.state[queue_machine] = disable_requests
def as_dict(self) -> str:
self_as_dict = {}
for queue_machine, disable_requests in self.state.items():
self_as_dict[queue_machine] = {
"disable_requests": [dr.as_dict() for dr in disable_requests]
}
return self_as_dict
class QueueManager: class QueueManager:
db_backend: ISqlDatabaseBackend db_backend: ISqlDatabaseBackend
grid_engine: IGridEngine grid_engine: IGridEngine
@ -230,7 +249,7 @@ class QueueManager:
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]: def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_id, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';" sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_id, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';"
results = self.db_backend.query(sql_query) results = self.db_backend.query(sql_query)
disable_requests_as_dict = {} disable_requests = []
for row in results: for row in results:
log_id = row[0] log_id = row[0]
user_id = row[1] user_id = row[1]
@ -240,8 +259,8 @@ class QueueManager:
reason = row[4] reason = row[4]
disable_id = row[5] disable_id = row[5]
timestamp = datetime.fromisoformat(row[6]) timestamp = datetime.fromisoformat(row[6])
disable_requests_as_dict[row[0]] = DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp) disable_requests.append(DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp))
return disable_requests_as_dict return disable_requests
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True): def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True):
logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason) logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason)
@ -249,7 +268,7 @@ class QueueManager:
for queue_machine in queue_machines: for queue_machine in queue_machines:
all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine) all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine)
disable_requests = all_disable_requests[queue_machine] disable_requests = all_disable_requests[queue_machine]
for dr in disable_requests.values(): for dr in disable_requests:
if dr.disable_id == disable_id: if dr.disable_id == disable_id:
raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.") raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.")
@ -268,7 +287,7 @@ class QueueManager:
for queue_machine in queue_machines: for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine) disable_requests = self.get_disable_requests(queue_machine)
dr_to_remove = None # the disable reason to remove dr_to_remove = None # the disable reason to remove
for dr in disable_requests.values(): for dr in disable_requests:
if dr.disable_id == disable_id: if dr.disable_id == disable_id:
dr_to_remove = dr dr_to_remove = dr
break break
@ -309,7 +328,7 @@ class QueueManager:
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none." assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
elif is_enabled and len(disable_requests) > 0: elif is_enabled and len(disable_requests) > 0:
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine" # queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine"
for dr in disable_requests.values(): for dr in disable_requests:
self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False) self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests." assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
for queue_machine in db_queues: for queue_machine in db_queues:
@ -317,8 +336,8 @@ class QueueManager:
self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';") self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
self.db_backend.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';") self.db_backend.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
def get_state_as_json(self) -> Dict[str, Any]: def get_state(self) -> QueueDisableState:
"""returns the state of the queues as a json string.""" """returns the state of the queues."""
# get the list of queue names from the disables table in the database # get the list of queue names from the disables table in the database
sql_query = "SELECT DISTINCT queue_machine FROM disables;" sql_query = "SELECT DISTINCT queue_machine FROM disables;"
results = self.db_backend.query(sql_query) results = self.db_backend.query(sql_query)
@ -326,12 +345,10 @@ class QueueManager:
assert len(row) == 1, "Each row should have only one column (queue_machine)" assert len(row) == 1, "Each row should have only one column (queue_machine)"
queue_machines = [row[0] for row in results] queue_machines = [row[0] for row in results]
state = {} state = QueueDisableState()
for queue_machine in queue_machines: for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine) disable_requests = self.get_disable_requests(queue_machine)
state[queue_machine] = { state.set_queue_machine_state(queue_machine, disable_requests)
"disable_requests": [dr.as_dict() for dr in disable_requests.values()]
}
return state return state
def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]: def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]:
@ -362,6 +379,7 @@ def main():
) )
subparsers = parser.add_subparsers(dest="action", help="Action to perform") subparsers = parser.add_subparsers(dest="action", help="Action to perform")
parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.") parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.")
parser.add_argument("--json", action="store_true", help="Output results in JSON format.")
# add-disable action # add-disable action
add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue") add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue")
@ -382,6 +400,9 @@ def main():
try: try:
args = parser.parse_args() args = parser.parse_args()
if args.action is None:
parser.print_help()
exit(1)
test_mode = args.test test_mode = args.test
@ -389,9 +410,8 @@ def main():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.") logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.")
if args.action is None: else:
parser.print_help() logging.basicConfig(level=logging.WARNING)
exit(1)
if test_mode: if test_mode:
queues_status_file_path = Path('./quman_test/queues_status.json') queues_status_file_path = Path('./quman_test/queues_status.json')
@ -421,18 +441,21 @@ def main():
elif args.action == "show-disable-requests": elif args.action == "show-disable-requests":
if args.queue is None: if args.queue is None:
# Show all disable requests # Show all disable requests
state = quman.get_state_as_json() state = quman.get_state()
print(json.dumps(state, indent=2))
else: else:
# Show disable requests for a specific queue # Show disable requests for a specific queue
queue_machines = quman.get_queue_machines(args.queue) queue_machines = quman.get_queue_machines(args.queue)
state = {} state = QueueDisableState()
for queue_machine in queue_machines: for queue_machine in queue_machines:
disable_requests = quman.get_disable_requests(queue_machine) disable_requests = quman.get_disable_requests(queue_machine)
state[queue_machine] = { state.set_queue_machine_state(queue_machine, disable_requests)
"disable_requests": [dr.as_dict() for dr in disable_requests.values()] if args.json:
} print(json.dumps(state.as_dict(), indent=2))
print(json.dumps(state, indent=2)) else:
for queue_machine, disable_requests in state.state.items():
print(f"Queue machine: {queue_machine}")
for dr in disable_requests:
print(f" Disable request by {dr.user_id} on {dr.host_fqdn} at {dr.timestamp.isoformat()} with disable id '{dr.disable_id}' for reason: {dr.reason}")
if test_mode: if test_mode:
grid_engine.get_status().save_as_json(queues_status_file_path) grid_engine.get_status().save_as_json(queues_status_file_path)

View File

@ -1,4 +1,4 @@
__version__ = '1.0.16' __version__ = '1.0.17'
class Version(object): class Version(object):

View File

@ -32,13 +32,13 @@ class QumanTestCase(unittest.TestCase):
print('queues state:') print('queues state:')
grid_engine.queues_status.print() grid_engine.queues_status.print()
print('disable requests:') print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2)) print(json.dumps(quman.get_state().as_dict(), indent=2))
print('synchronizing with grid engine...') print('synchronizing with grid engine...')
quman.synchronize_with_grid_engine() quman.synchronize_with_grid_engine()
print('queues state:') print('queues state:')
grid_engine.queues_status.print() grid_engine.queues_status.print()
print('disable requests:') print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2)) print(json.dumps(quman.get_state().as_dict(), indent=2))
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack') quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine) # attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
@ -54,7 +54,7 @@ class QumanTestCase(unittest.TestCase):
print('queues state:') print('queues state:')
grid_engine.queues_status.print() grid_engine.queues_status.print()
print('disable requests:') print('disable requests:')
print(json.dumps(quman.get_state_as_json(), indent=2)) print(json.dumps(quman.get_state().as_dict(), indent=2))
if __name__ == '__main__': if __name__ == '__main__':