Compare commits

...

16 Commits

Author SHA1 Message Date
Guillaume Raffy f64e31b420 cocluto v1.0.17 - added json option to quman so that the user can choose to get results in machine friendly (json) or human friendly format
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 18:08:58 +02:00
Guillaume Raffy 04adbbc684 cocluto v1.0.16 - overhauld quman arguments parsing
- quman's argument parsing now uses subparsers to allow for more actions than enable and disable
- added the show-disables action to quman
- added a test mode for the command line quman, that allows the user to test quman using quman's user interface
- renamed disable tag as disable id, as it's more meaningful (identfies a disable request)
- made user errors more user-friendly (ie not polluted with call stack)

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 17:01:56 +02:00
Guillaume Raffy 25afd32504 cocluto v1.0.15 - added user friedly error message when qmod is not available
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 14:20:37 +02:00
Guillaume Raffy 4cc541d9c3 cocluto v1.0.14 - added support to full queues to quman
Whith this feature, it's now possible to disable an entire queue (and not only one queue machine), with

```sh
quman d main.q --disable-tag admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'
```
-  also replaced the term queue name with queue machine for clarity, now that we have both QueueMachineId and QueueId

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 11:58:47 +02:00
Guillaume Raffy bfe1eeb084 cocluto v1.0.13 - improvements to quman
- the userid and the host fqdn of are now recorded
- found better names (more appropriate and easier to understand)
  - disable reason -> disable request
  - requester -> disable tag

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-02 19:31:31 +02:00
Guillaume Raffy 12be70500b fixed bug that caused quman test to fail because the temporary directory is missing
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-02 18:13:39 +02:00
Guillaume Raffy 7acaa1ad5a added continuous integration to cocluto
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-02 18:09:17 +02:00
Guillaume Raffy 0f0d5f800e v 1.0.12
- fixed bug in request_queue_activation, which caused it to always enable the queue, even if there are other disables
- added a synchronization mechanism to quman which patches the database to ensure the database is coherent with the current activation of the queues

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 19:08:06 +02:00
Guillaume Raffy f5dce0bf10 v 1.0.11
- added IGridEngine abstraction that allows to switch between dry-run mode and normal mode
- added more commandes to the test

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 17:31:45 +02:00
Guillaume Raffy a7d92a3f99 v 1.0.10
- fixed iteration bug in qman
- added missing unit test

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 16:33:46 +02:00
Guillaume Raffy 3beba78ecc started to develop quman, a tool to handle multiple disable requests on the same queue
- to make this work, cocluto now uses a pyproject.toml configuration
- cocluto v1.0.9
- this is still work in progress, but the basic behaviour

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 16:12:17 +02:00
Guillaume Raffy ba3d410b3f now that non ipr (ietr) machines have been migrated from physix to alambix, updated the graphs generator accordingly to exclude these machines from the graphs.
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3560]
fixes [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=4335]
2026-03-06 19:03:54 +01:00
Guillaume Raffy a8b477978d now powermap draws redundant cables as dash lines.
fixes - Bug 4250 - incohérence des consommations affichées par powermap
2025-12-11 14:23:07 +01:00
Guillaume Raffy 8a2c46c377 cocluto v1.08
- fixed bug that caused `draw_machine_age_pyramid_graph` to fail when some cluster machines get  older than 20 years. Increased the age limit to 25 years, and added clipping to the graph to cope with some machines even older than this

fixes [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=4101]
2025-06-24 16:10:09 +02:00
Guillaume Raffy 8679ae2ca5 cocluto v 1.07
fixed logic error foudn when working on [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3979]
2024-11-15 16:41:41 +01:00
Guillaume Raffy 27493f2ed7 fixed logic error foudn when working on [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3979] 2024-11-15 16:40:23 +01:00
9 changed files with 663 additions and 24 deletions

59
ci/ipr.jenkins Normal file
View File

@ -0,0 +1,59 @@
// Jenkinsfile for jenkins.ipr.univ-rennes1.fr (Institut de Physique de Rennes)
pipeline {
agent {label 'alambix_agent'}
environment {
VENV_PATH = "${WORKSPACE}/cocluto.venv"
}
stages {
stage('setup') {
steps {
echo 'setting up itinv test environment'
sh """#!/bin/bash
python3 -m venv ${VENV_PATH} &&
source ${VENV_PATH}/bin/activate &&
pip install --upgrade pip &&
pip install --upgrade setuptools &&
pip install .
"""
}
}
stage('testing cocluto (cluster tools)') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_cocluto
"""
}
}
stage('testing simpadb') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_simpadb
"""
}
}
stage('testing quman') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_quman
"""
}
}
}
post
{
// always, success, failure, unstable, changed
failure
{
mail bcc: '', body: "<b>Jenkins build failed</b><br>Project: ${env.JOB_NAME} <br>Build Number: ${env.BUILD_NUMBER} <br>Build URL: ${env.BUILD_URL}", cc: 'guillaume.raffy@univ-rennes.fr, julien.dasilva@univ-rennes.fr', charset: 'UTF-8', from: '', mimeType: 'text/html', replyTo: '', subject: "CI build failed for ${env.JOB_NAME}", to: "info-ipr@univ-rennes1.fr";
}
cleanup {
cleanWs()
}
}
}

View File

@ -501,25 +501,29 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
for con in power_config.connections:
# print(con.from_plug.machine.name, con.to_plug.machine.name)
if not con.is_redundancy_cable(): # don't display redundancy cables, as they might overlap and hide the main one
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
amperes = power_consumption / 220.0
color = '/svg/green'
capacity = con.get_max_amperes()
penwidth_scaler = 0.25
if capacity is None:
max_amp = '? A'
color = '/svg/red'
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
else:
max_amp = str(capacity) + 'A'
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
penwidth = capacity * penwidth_scaler
label = "%.1f/%s" % (amperes, max_amp)
# color='//%d' % int(9.0-amperes/capacity*8)
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
amperes = power_consumption / 220.0
color = '/svg/green'
capacity = con.get_max_amperes()
penwidth_scaler = 0.25
if capacity is None:
max_amp = '? A'
color = '/svg/red'
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
else:
max_amp = str(capacity) + 'A'
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
penwidth = capacity * penwidth_scaler
label = "%.1f/%s" % (amperes, max_amp)
# color='//%d' % int(9.0-amperes/capacity*8)
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth)
if con.is_redundancy_cable():
edge_style = 'dashed'
else:
edge_style = 'solid'
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth, style=edge_style)
for rack_id, rack in racks.items():
# sub = graph.add_subgraph(rack, name='cluster_%s' % rack_id, rank='same')
@ -534,3 +538,4 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
graph.layout(prog='dot')
graph.draw(svg_file_path)

View File

@ -253,7 +253,7 @@ class SqliteDb(ISqlDatabaseBackend):
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
if sqlite_db_path != ':memory:' and not sqlite_db_path.exists():
if sqlite_db_path == ':memory:' or not sqlite_db_path.exists():
logging.debug('creating sqlite database in %s', sqlite_db_path)
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
else:

View File

@ -40,6 +40,10 @@ def is_test_machine(name):
'physix13',
'physix14',
'physix15',
'alambix12',
'alambix13',
'alambix14',
'alambix15',
]
@ -315,7 +319,7 @@ def draw_machine_age_pyramid_graph(inventory):
:param Inventory inventory: the inventory database
"""
oldest_age = 20
oldest_age = 25
age_histogram = np.zeros(shape=(oldest_age))
rows = inventory.query("SELECT * FROM machines")
@ -328,7 +332,9 @@ def draw_machine_age_pyramid_graph(inventory):
if purchase_date is not None:
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
age = datetime.datetime.now() - purchase_date
age_histogram[age.days // 365] += 1
age_in_years = age.days // 365
if age_in_years < oldest_age:
age_histogram[age_in_years] += 1
# print(name, age)
fig, ax = plt.subplots()
@ -349,7 +355,7 @@ def draw_core_age_pyramid_graph(inventory):
:param Inventory inventory: the inventory database
"""
oldest_age = 20
oldest_age = 25
age_histogram = np.zeros(shape=(oldest_age))
rows = inventory.query("SELECT * FROM machines")
@ -362,7 +368,9 @@ def draw_core_age_pyramid_graph(inventory):
if purchase_date is not None:
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
age = datetime.datetime.now() - purchase_date
age_histogram[age.days // 365] += inventory.get_num_cores(name)
age_in_years = age.days // 365
if age_in_years < oldest_age:
age_histogram[age_in_years] += inventory.get_num_cores(name)
# print(name, age)
fig, ax = plt.subplots()

469
cocluto/quman.py Executable file
View File

@ -0,0 +1,469 @@
#!/usr/bin/env python3
from abc import ABC, abstractmethod
import sys
from typing import List, Dict, Any, Union
import logging
import subprocess
import argparse
import re
import json
from datetime import datetime
from pathlib import Path
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb
from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
LogId = int # identifies a log entry in the database
DisableId = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
QueueId = str # identifies the queue eg main.q
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr
UserId = str # unix user id of a user eg graffy
def is_queue_machine_id(s: str) -> bool:
"""returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise."""
# if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is
return re.match(r'^[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\-\.]+$', s) is not None
class QueuesStatus():
is_enabled: Dict[QueueMachineId, bool]
def __init__(self, queues_status_file_path: Path = None):
self.is_enabled = {}
if queues_status_file_path is not None:
self.load_from_json(queues_status_file_path)
def load_from_json(self, path: Path):
with open(path, 'r', encoding='utf-8') as f:
self.is_enabled = json.load(f)
def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool):
self.is_enabled[queue_machine] = is_enabled
def print(self):
for queue_machine, is_enabled in self.is_enabled.items():
print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}")
def save_as_json(self, path: Path):
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.is_enabled, f, indent=2)
class IGridEngine(ABC):
@abstractmethod
def disable_queue_machine(self, queue_machine: QueueMachineId):
pass
@abstractmethod
def enable_queue_machine(self, queue_machine: QueueMachineId):
pass
@abstractmethod
def get_status(self) -> QueuesStatus:
pass
class MockGridEngine(IGridEngine):
queues_status: QueuesStatus
def __init__(self, queues_status: QueuesStatus):
self.queues_status = queues_status
def disable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock disable queue {queue_machine}")
assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status {self.queues_status.is_enabled}"
assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled"
self.queues_status.is_enabled[queue_machine] = False
def enable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock enable queue {queue_machine}")
assert queue_machine in self.queues_status.is_enabled, f"Queue machine {queue_machine} not found in queues status"
assert not self.queues_status.is_enabled[queue_machine], f"Queue machine {queue_machine} is already enabled"
self.queues_status.is_enabled[queue_machine] = True
def get_status(self) -> QueuesStatus:
return self.queues_status
class Sge(IGridEngine):
dry_run: bool
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
if not self.dry_run:
# check that qmod command is available
try:
subprocess.run(["qmod", "-h"], check=True, capture_output=True)
except FileNotFoundError as exc:
raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.") from exc
def run_qmod(self, args):
"""runs qmod with the given arguments."""
cmd = ["qmod"] + args
if self.dry_run:
print(f"Dry run: {' '.join(cmd)}")
else:
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"qmod command failed: {e}") from e
def disable_queue_machine(self, queue_machine: QueueMachineId):
self.run_qmod(["-d", queue_machine])
def enable_queue_machine(self, queue_machine: QueueMachineId):
self.run_qmod(["-e", queue_machine])
def get_status(self) -> QueuesStatus:
process = subprocess.run(['qstat', '-f', '-u', '*'], check=True, capture_output=True)
# Parse the output to extract queue statuses
# This is a simplified example - you would need to parse the actual qstat output
queues_status = QueuesStatus()
qstat_parser = QstatParser()
jobs_state: JobsState = qstat_parser.parse_qstat_output(process.stdout.decode())
queue_machines = jobs_state.get_queue_machines()
for queue_machine in queue_machines.values():
queues_status.add_queue(queue_machine.get_name(), queue_machine.is_enabled())
return queues_status
def init_db(db_backend: ISqlDatabaseBackend):
# a table storing the log of actions (queue activation or deactivation)
if not db_backend.table_exists('log'):
fields = [
SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True),
SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'),
SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'),
SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'),
SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'),
SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'),
SqlTableField('disable_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
]
db_backend.create_table('log', fields)
# a table storing the current disable requests
if not db_backend.table_exists('disables'):
fields = [
SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue machine that was disabled'),
]
db_backend.create_table('disables', fields)
# a table storing the current queues
if not db_backend.table_exists('queues'):
fields = [
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue'),
]
db_backend.create_table('queues', fields)
def create_db_backend() -> ISqlDatabaseBackend:
# db_server_fqdn = 'alambix-master.ipr.univ-rennes.fr'
# db_user = 'qumanw'
# db_name = 'quman'
# ssh_user = 'qumanw'
# backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
backend = SqliteDb(Path('./quman_test/quman.sqlite'))
init_db(backend)
return backend
class DisableRequest:
db_id: int # index of this disable request in the database
user_id: UserId # the id of the user who requested the disable action (unix user id)
host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested
queue_machines: List[QueueMachineId] # the names of the queue machines that were modified
reason: str
disable_id: DisableId
timestamp: datetime
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_id: DisableId, timestamp: datetime):
self.log_id = log_id
self.user_id = user_id
self.host_fqdn = host_fqdn
self.queue_machines = queue_machines
self.reason = reason
self.disable_id = disable_id
self.timestamp = timestamp
def as_dict(self) -> Dict[str, Any]:
return {
"log_id": self.log_id,
"user_id": self.user_id,
"host_fqdn": self.host_fqdn,
"queue_machines": self.queue_machines,
"reason": self.reason,
"disable_id": self.disable_id,
"timestamp": self.timestamp.isoformat(),
}
class QueueDisableState:
state: Dict[QueueMachineId, List[DisableRequest]]
def __init__(self):
self.state = {}
def set_queue_machine_state(self, queue_machine: QueueMachineId, disable_requests: List[DisableRequest]):
assert all(isinstance(dr, DisableRequest) for dr in disable_requests)
self.state[queue_machine] = disable_requests
def as_dict(self) -> str:
self_as_dict = {}
for queue_machine, disable_requests in self.state.items():
self_as_dict[queue_machine] = {
"disable_requests": [dr.as_dict() for dr in disable_requests]
}
return self_as_dict
class QueueManager:
db_backend: ISqlDatabaseBackend
grid_engine: IGridEngine
def __init__(self, db_backend: ISqlDatabaseBackend, grid_engine: IGridEngine = None):
self.db_backend = db_backend
if grid_engine is None:
grid_engine = Sge()
self.grid_engine = grid_engine
def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_id: DisableId, reason: str) -> LogId:
assert isinstance(queue_machines, list)
assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'"
userid = subprocess.check_output(['whoami']).decode().strip()
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
timestamp = datetime.now().isoformat()
sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_id}', '{reason}');"
self.db_backend.query(sql_query)
# get the log id of the disable action that was just inserted
log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0]
return log_id
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_id, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';"
results = self.db_backend.query(sql_query)
disable_requests = []
for row in results:
log_id = row[0]
user_id = row[1]
host_fqdn = row[2]
queue_machines = row[3].split(',')
assert queue_machine == queue_machine, "All results should be for the same queue"
reason = row[4]
disable_id = row[5]
timestamp = datetime.fromisoformat(row[6])
disable_requests.append(DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp))
return disable_requests
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True):
logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason)
all_disable_requests = {}
for queue_machine in queue_machines:
all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine)
disable_requests = all_disable_requests[queue_machine]
for dr in disable_requests:
if dr.disable_id == disable_id:
raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.")
for queue_machine in queue_machines:
if perform_disable:
disable_requests = all_disable_requests[queue_machine]
if len(disable_requests) == 0:
# queue is currently active, we can disable it
self.grid_engine.disable_queue_machine(queue_machine)
disable_log_id = self.log_modification(queue_machines, "disable", disable_id, reason)
for queue_machine in queue_machines:
self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_enable: bool = True):
for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine)
dr_to_remove = None # the disable reason to remove
for dr in disable_requests:
if dr.disable_id == disable_id:
dr_to_remove = dr
break
if dr_to_remove is None:
raise RuntimeError(f"Disable id {disable_id} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation.")
if perform_enable:
if len(disable_requests) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue_machine(queue_machine)
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason) # noqa: F841 pylint:disable=unused-variable
for queue_machine in queue_machines:
self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
def synchronize_with_grid_engine(self):
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
qs = self.grid_engine.get_status()
db_queues = set()
sql_query = "SELECT queue_machine FROM queues;"
results = self.db_backend.query(sql_query)
for row in results:
assert len(row) == 1, "Each row should have only one column (queue_machine)"
db_queues.add(row[0])
for queue_machine, is_enabled in qs.is_enabled.items():
if queue_machine not in db_queues:
# if the queue is not in the database, we add it
logging.warning("Queue %s is not in the database, adding it.", queue_machine)
self.db_backend.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
else:
db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine
disable_requests = self.get_disable_requests(queue_machine)
if not is_enabled and len(disable_requests) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine"
self.request_queue_machines_deactivation([queue_machine], "quman-sync", "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
elif is_enabled and len(disable_requests) > 0:
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine"
for dr in disable_requests:
self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
for queue_machine in db_queues:
logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine)
self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
self.db_backend.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
def get_state(self) -> QueueDisableState:
"""returns the state of the queues."""
# get the list of queue names from the disables table in the database
sql_query = "SELECT DISTINCT queue_machine FROM disables;"
results = self.db_backend.query(sql_query)
for row in results:
assert len(row) == 1, "Each row should have only one column (queue_machine)"
queue_machines = [row[0] for row in results]
state = QueueDisableState()
for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine)
state.set_queue_machine_state(queue_machine, disable_requests)
return state
def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]:
logging.debug("Getting queue machines for queue_machine identifier '%s'", queue_machine)
queue_machines = []
if is_queue_machine_id(queue_machine):
queue_machines = [queue_machine]
else:
status = self.grid_engine.get_status()
queue_machines = [q for q in status.is_enabled if q.startswith(queue_machine)]
assert len(queue_machines) > 0, f"No queue machine found for queue identifier '{queue_machine}'"
logging.debug("Found queue machines %s for queue_machine identifier '%s'", queue_machines, queue_machine)
return queue_machines
def main():
parser = argparse.ArgumentParser(
description="qman: manage queue disable/enable requests with logging",
prog="quman",
epilog="Example usage:\n"
" quman add-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance'\n"
" quman remove-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance completed'\n"
" quman add-disable-request main.q --disable-id admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'\n"
" quman show-disable-requests main.q@node42\n"
" quman show-disable-requests\n",
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.")
parser.add_argument("--json", action="store_true", help="Output results in JSON format.")
# add-disable action
add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue")
add_parser.add_argument("queue", help="Queue to disable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
add_parser.add_argument("--disable-id", required=True, help="id for the disable request (e.g., croconaus, manual.graffy, etc.)")
add_parser.add_argument("--reason", required=True, help="Reason for the disabling")
# remove-disable action
remove_parser = subparsers.add_parser("remove-disable-request", help="removes a disable request on a queue")
remove_parser.add_argument("queue", help="Queue to enable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
remove_parser.add_argument("--disable-id", required=True, help="id of the disable request to remove")
remove_parser.add_argument("--reason", default="Manual removal", help="Reason for the removal")
# show-disable-requests action
show_parser = subparsers.add_parser("show-disable-requests", help="Show disable requests")
show_parser.add_argument("queue", nargs="?", default=None, help="Optional: specific queue to show (if omitted, shows all)")
try:
args = parser.parse_args()
if args.action is None:
parser.print_help()
exit(1)
test_mode = args.test
if test_mode:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.")
else:
logging.basicConfig(level=logging.WARNING)
if test_mode:
queues_status_file_path = Path('./quman_test/queues_status.json')
if queues_status_file_path.exists():
logging.info("Loading queues status from %s", queues_status_file_path)
qs = QueuesStatus(queues_status_file_path)
else:
logging.info("No queues status file found at %s. Initializing with default queues status.", queues_status_file_path)
qs = QueuesStatus()
for node_id in range(40, 44):
qs.add_queue(f'main.q@alambix{node_id}.ipr.univ-rennes.fr', True)
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
else:
grid_engine = Sge()
db_backend = create_db_backend()
quman = QueueManager(db_backend, grid_engine)
quman.synchronize_with_grid_engine()
if args.action == "add-disable-request":
queue_machines = quman.get_queue_machines(args.queue)
quman.request_queue_machines_deactivation(queue_machines, args.disable_id, args.reason)
elif args.action == "remove-disable-request":
queue_machines = quman.get_queue_machines(args.queue)
quman.request_queue_machines_activation(queue_machines, args.disable_id, args.reason)
elif args.action == "show-disable-requests":
if args.queue is None:
# Show all disable requests
state = quman.get_state()
else:
# Show disable requests for a specific queue
queue_machines = quman.get_queue_machines(args.queue)
state = QueueDisableState()
for queue_machine in queue_machines:
disable_requests = quman.get_disable_requests(queue_machine)
state.set_queue_machine_state(queue_machine, disable_requests)
if args.json:
print(json.dumps(state.as_dict(), indent=2))
else:
for queue_machine, disable_requests in state.state.items():
print(f"Queue machine: {queue_machine}")
for dr in disable_requests:
print(f" Disable request by {dr.user_id} on {dr.host_fqdn} at {dr.timestamp.isoformat()} with disable id '{dr.disable_id}' for reason: {dr.reason}")
if test_mode:
grid_engine.get_status().save_as_json(queues_status_file_path)
except RuntimeError as e:
sys.stderr.write(f"ERROR: {e}\n")
exit(1)
if __name__ == "__main__":
main()

View File

@ -1,3 +1,6 @@
__version__ = '1.0.17'
class Version(object):
"""
simple version number made of a series of positive integers separated by dots

34
pyproject.toml Normal file
View File

@ -0,0 +1,34 @@
[build-system]
requires = ["setuptools"]
build-backup = "setuptools.build_meta"
[project]
name = "cocluto"
dynamic = ["version"] # the list of fields whose values are dicovered by the backend (eg __version__)
description = "compute cluster utility tools"
readme = "README.md"
keywords = ["sql", "hpc", "pdu", "power supply", "inventory", "son of grid engine"]
license = {text = "MIT License"}
dependencies = [
"pygraphviz", # requires apt install graphviz-dev
"mysqlclient",
]
requires-python = ">= 3.8"
authors = [
{name = "Guillaume Raffy", email = "guillaume.raffy@univ-rennes.fr"}
]
[project.scripts]
quman = "cocluto.quman:main"
[project.urls]
Repository = "https://git.ipr.univ-rennes.fr/cellinfo/cocluto"
[tool.setuptools]
packages = ["cocluto"]
[tool.setuptools.dynamic]
version = {attr = "cocluto.version.__version__"}
[tool.setuptools.package-data]
iprbench = ["resources/**/*"]

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='cocluto',
version=1.06,
version='1.0.9',
description='compute cluster utility tools',
url='https://git.ipr.univ-rennes1.fr/graffy/cocluto',
author='Guillaume Raffy',

61
test/test_quman.py Normal file
View File

@ -0,0 +1,61 @@
from pathlib import Path
import json
import unittest
import logging
# from cocluto import ClusterController
from cocluto.SimpaDbUtil import SqliteDb
from cocluto.quman import QueueManager, init_db, MockGridEngine, QueuesStatus
class QumanTestCase(unittest.TestCase):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setUp(self) -> None:
return super().setUp()
def test_quman(self):
logging.info('test_quman')
db_path = Path('./quman_test/quman.sqlite')
db_path.parent.mkdir(exist_ok=True)
if db_path.exists():
db_path.unlink()
db_backend = SqliteDb(db_path)
init_db(db_backend)
qs = QueuesStatus()
for node_id in range(40, 44):
qs.add_queue(f'main.q@alambix{node_id}', True)
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
grid_engine.disable_queue_machine('main.q@alambix42') # simulate that the queue is already disabled)
quman = QueueManager(db_backend, grid_engine)
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state().as_dict(), indent=2))
print('synchronizing with grid engine...')
quman.synchronize_with_grid_engine()
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state().as_dict(), indent=2))
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
with self.assertRaises(RuntimeError):
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman')
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco')
quman.request_queue_machines_activation(['main.q@alambix42'], 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
main_queue_machines = quman.get_queue_machines('main.q')
quman.request_queue_machines_deactivation(main_queue_machines, 'sysadmin.graffy.bug4242', 'disable all cluster to prepare complete shutdown')
main_queue_machines = quman.get_queue_machines('main.q')
quman.request_queue_machines_activation(main_queue_machines, 'sysadmin.graffy.bug4242', 'electricity is back, reactivating all cluster')
db_backend.dump(Path('./quman_test/quman_dump.sql'))
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state().as_dict(), indent=2))
if __name__ == '__main__':
unittest.main()