Compare commits
16 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
f64e31b420 | |
|
|
04adbbc684 | |
|
|
25afd32504 | |
|
|
4cc541d9c3 | |
|
|
bfe1eeb084 | |
|
|
12be70500b | |
|
|
7acaa1ad5a | |
|
|
0f0d5f800e | |
|
|
f5dce0bf10 | |
|
|
a7d92a3f99 | |
|
|
3beba78ecc | |
|
|
ba3d410b3f | |
|
|
a8b477978d | |
|
|
8a2c46c377 | |
|
|
8679ae2ca5 | |
|
|
27493f2ed7 |
|
|
@ -0,0 +1,59 @@
|
|||
// Jenkinsfile for jenkins.ipr.univ-rennes1.fr (Institut de Physique de Rennes)
|
||||
pipeline {
|
||||
agent {label 'alambix_agent'}
|
||||
environment {
|
||||
VENV_PATH = "${WORKSPACE}/cocluto.venv"
|
||||
}
|
||||
stages {
|
||||
stage('setup') {
|
||||
steps {
|
||||
echo 'setting up itinv test environment'
|
||||
sh """#!/bin/bash
|
||||
python3 -m venv ${VENV_PATH} &&
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
pip install --upgrade pip &&
|
||||
pip install --upgrade setuptools &&
|
||||
pip install .
|
||||
"""
|
||||
}
|
||||
}
|
||||
stage('testing cocluto (cluster tools)') {
|
||||
steps {
|
||||
sh """#!/bin/bash
|
||||
set -o errexit
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
python3 -m unittest test.test_cocluto
|
||||
"""
|
||||
}
|
||||
}
|
||||
stage('testing simpadb') {
|
||||
steps {
|
||||
sh """#!/bin/bash
|
||||
set -o errexit
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
python3 -m unittest test.test_simpadb
|
||||
"""
|
||||
}
|
||||
}
|
||||
stage('testing quman') {
|
||||
steps {
|
||||
sh """#!/bin/bash
|
||||
set -o errexit
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
python3 -m unittest test.test_quman
|
||||
"""
|
||||
}
|
||||
}
|
||||
}
|
||||
post
|
||||
{
|
||||
// always, success, failure, unstable, changed
|
||||
failure
|
||||
{
|
||||
mail bcc: '', body: "<b>Jenkins build failed</b><br>Project: ${env.JOB_NAME} <br>Build Number: ${env.BUILD_NUMBER} <br>Build URL: ${env.BUILD_URL}", cc: 'guillaume.raffy@univ-rennes.fr, julien.dasilva@univ-rennes.fr', charset: 'UTF-8', from: '', mimeType: 'text/html', replyTo: '', subject: "CI build failed for ${env.JOB_NAME}", to: "info-ipr@univ-rennes1.fr";
|
||||
}
|
||||
cleanup {
|
||||
cleanWs()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -501,25 +501,29 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
|
|||
|
||||
for con in power_config.connections:
|
||||
# print(con.from_plug.machine.name, con.to_plug.machine.name)
|
||||
if not con.is_redundancy_cable(): # don't display redundancy cables, as they might overlap and hide the main one
|
||||
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
|
||||
amperes = power_consumption / 220.0
|
||||
color = '/svg/green'
|
||||
capacity = con.get_max_amperes()
|
||||
penwidth_scaler = 0.25
|
||||
if capacity is None:
|
||||
max_amp = '? A'
|
||||
color = '/svg/red'
|
||||
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
|
||||
else:
|
||||
max_amp = str(capacity) + 'A'
|
||||
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
|
||||
penwidth = capacity * penwidth_scaler
|
||||
label = "%.1f/%s" % (amperes, max_amp)
|
||||
# color='//%d' % int(9.0-amperes/capacity*8)
|
||||
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
|
||||
amperes = power_consumption / 220.0
|
||||
color = '/svg/green'
|
||||
capacity = con.get_max_amperes()
|
||||
penwidth_scaler = 0.25
|
||||
if capacity is None:
|
||||
max_amp = '? A'
|
||||
color = '/svg/red'
|
||||
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
|
||||
else:
|
||||
max_amp = str(capacity) + 'A'
|
||||
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
|
||||
penwidth = capacity * penwidth_scaler
|
||||
label = "%.1f/%s" % (amperes, max_amp)
|
||||
# color='//%d' % int(9.0-amperes/capacity*8)
|
||||
|
||||
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
|
||||
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth)
|
||||
if con.is_redundancy_cable():
|
||||
edge_style = 'dashed'
|
||||
else:
|
||||
edge_style = 'solid'
|
||||
|
||||
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
|
||||
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth, style=edge_style)
|
||||
|
||||
for rack_id, rack in racks.items():
|
||||
# sub = graph.add_subgraph(rack, name='cluster_%s' % rack_id, rank='same')
|
||||
|
|
@ -534,3 +538,4 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
|
|||
|
||||
graph.layout(prog='dot')
|
||||
graph.draw(svg_file_path)
|
||||
|
||||
|
|
|
|||
|
|
@ -253,7 +253,7 @@ class SqliteDb(ISqlDatabaseBackend):
|
|||
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
|
||||
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
|
||||
|
||||
if sqlite_db_path != ':memory:' and not sqlite_db_path.exists():
|
||||
if sqlite_db_path == ':memory:' or not sqlite_db_path.exists():
|
||||
logging.debug('creating sqlite database in %s', sqlite_db_path)
|
||||
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -40,6 +40,10 @@ def is_test_machine(name):
|
|||
'physix13',
|
||||
'physix14',
|
||||
'physix15',
|
||||
'alambix12',
|
||||
'alambix13',
|
||||
'alambix14',
|
||||
'alambix15',
|
||||
]
|
||||
|
||||
|
||||
|
|
@ -315,7 +319,7 @@ def draw_machine_age_pyramid_graph(inventory):
|
|||
:param Inventory inventory: the inventory database
|
||||
"""
|
||||
|
||||
oldest_age = 20
|
||||
oldest_age = 25
|
||||
age_histogram = np.zeros(shape=(oldest_age))
|
||||
|
||||
rows = inventory.query("SELECT * FROM machines")
|
||||
|
|
@ -328,7 +332,9 @@ def draw_machine_age_pyramid_graph(inventory):
|
|||
if purchase_date is not None:
|
||||
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
|
||||
age = datetime.datetime.now() - purchase_date
|
||||
age_histogram[age.days // 365] += 1
|
||||
age_in_years = age.days // 365
|
||||
if age_in_years < oldest_age:
|
||||
age_histogram[age_in_years] += 1
|
||||
# print(name, age)
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
|
|
@ -349,7 +355,7 @@ def draw_core_age_pyramid_graph(inventory):
|
|||
:param Inventory inventory: the inventory database
|
||||
"""
|
||||
|
||||
oldest_age = 20
|
||||
oldest_age = 25
|
||||
age_histogram = np.zeros(shape=(oldest_age))
|
||||
|
||||
rows = inventory.query("SELECT * FROM machines")
|
||||
|
|
@ -362,7 +368,9 @@ def draw_core_age_pyramid_graph(inventory):
|
|||
if purchase_date is not None:
|
||||
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
|
||||
age = datetime.datetime.now() - purchase_date
|
||||
age_histogram[age.days // 365] += inventory.get_num_cores(name)
|
||||
age_in_years = age.days // 365
|
||||
if age_in_years < oldest_age:
|
||||
age_histogram[age_in_years] += inventory.get_num_cores(name)
|
||||
# print(name, age)
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,469 @@
|
|||
#!/usr/bin/env python3
|
||||
from abc import ABC, abstractmethod
|
||||
import sys
|
||||
from typing import List, Dict, Any, Union
|
||||
import logging
|
||||
import subprocess
|
||||
import argparse
|
||||
import re
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField # , SqlSshAccessedMysqlDb
|
||||
from cocluto.ClusterController.QstatParser import QstatParser
|
||||
from cocluto.ClusterController.JobsState import JobsState
|
||||
|
||||
LogId = int # identifies a log entry in the database
|
||||
DisableId = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
|
||||
QueueId = str # identifies the queue eg main.q
|
||||
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
|
||||
HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr
|
||||
UserId = str # unix user id of a user eg graffy
|
||||
|
||||
|
||||
def is_queue_machine_id(s: str) -> bool:
|
||||
"""returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise."""
|
||||
# if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is
|
||||
return re.match(r'^[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\-\.]+$', s) is not None
|
||||
|
||||
|
||||
class QueuesStatus():
|
||||
is_enabled: Dict[QueueMachineId, bool]
|
||||
|
||||
def __init__(self, queues_status_file_path: Path = None):
|
||||
self.is_enabled = {}
|
||||
if queues_status_file_path is not None:
|
||||
self.load_from_json(queues_status_file_path)
|
||||
|
||||
def load_from_json(self, path: Path):
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
self.is_enabled = json.load(f)
|
||||
|
||||
def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool):
|
||||
self.is_enabled[queue_machine] = is_enabled
|
||||
|
||||
def print(self):
|
||||
for queue_machine, is_enabled in self.is_enabled.items():
|
||||
print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}")
|
||||
|
||||
def save_as_json(self, path: Path):
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.is_enabled, f, indent=2)
|
||||
|
||||
|
||||
class IGridEngine(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def disable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def enable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_status(self) -> QueuesStatus:
|
||||
pass
|
||||
|
||||
|
||||
class MockGridEngine(IGridEngine):
|
||||
queues_status: QueuesStatus
|
||||
|
||||
def __init__(self, queues_status: QueuesStatus):
|
||||
self.queues_status = queues_status
|
||||
|
||||
def disable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
print(f"Mock disable queue {queue_machine}")
|
||||
assert queue_machine in self.queues_status.is_enabled, f"Queue {queue_machine} not found in queues status {self.queues_status.is_enabled}"
|
||||
assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled"
|
||||
self.queues_status.is_enabled[queue_machine] = False
|
||||
|
||||
def enable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
print(f"Mock enable queue {queue_machine}")
|
||||
assert queue_machine in self.queues_status.is_enabled, f"Queue machine {queue_machine} not found in queues status"
|
||||
assert not self.queues_status.is_enabled[queue_machine], f"Queue machine {queue_machine} is already enabled"
|
||||
self.queues_status.is_enabled[queue_machine] = True
|
||||
|
||||
def get_status(self) -> QueuesStatus:
|
||||
return self.queues_status
|
||||
|
||||
|
||||
class Sge(IGridEngine):
|
||||
dry_run: bool
|
||||
|
||||
def __init__(self, dry_run: bool = False):
|
||||
self.dry_run = dry_run
|
||||
if not self.dry_run:
|
||||
# check that qmod command is available
|
||||
try:
|
||||
subprocess.run(["qmod", "-h"], check=True, capture_output=True)
|
||||
except FileNotFoundError as exc:
|
||||
raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.") from exc
|
||||
|
||||
def run_qmod(self, args):
|
||||
"""runs qmod with the given arguments."""
|
||||
cmd = ["qmod"] + args
|
||||
if self.dry_run:
|
||||
print(f"Dry run: {' '.join(cmd)}")
|
||||
else:
|
||||
try:
|
||||
subprocess.run(cmd, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise RuntimeError(f"qmod command failed: {e}") from e
|
||||
|
||||
def disable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
self.run_qmod(["-d", queue_machine])
|
||||
|
||||
def enable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
self.run_qmod(["-e", queue_machine])
|
||||
|
||||
def get_status(self) -> QueuesStatus:
|
||||
process = subprocess.run(['qstat', '-f', '-u', '*'], check=True, capture_output=True)
|
||||
# Parse the output to extract queue statuses
|
||||
# This is a simplified example - you would need to parse the actual qstat output
|
||||
queues_status = QueuesStatus()
|
||||
qstat_parser = QstatParser()
|
||||
jobs_state: JobsState = qstat_parser.parse_qstat_output(process.stdout.decode())
|
||||
queue_machines = jobs_state.get_queue_machines()
|
||||
for queue_machine in queue_machines.values():
|
||||
queues_status.add_queue(queue_machine.get_name(), queue_machine.is_enabled())
|
||||
|
||||
return queues_status
|
||||
|
||||
|
||||
def init_db(db_backend: ISqlDatabaseBackend):
|
||||
# a table storing the log of actions (queue activation or deactivation)
|
||||
if not db_backend.table_exists('log'):
|
||||
fields = [
|
||||
SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True),
|
||||
SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'),
|
||||
SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'),
|
||||
SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'),
|
||||
SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'),
|
||||
SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'),
|
||||
SqlTableField('disable_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
|
||||
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
|
||||
]
|
||||
db_backend.create_table('log', fields)
|
||||
|
||||
# a table storing the current disable requests
|
||||
if not db_backend.table_exists('disables'):
|
||||
fields = [
|
||||
SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
|
||||
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue machine that was disabled'),
|
||||
]
|
||||
db_backend.create_table('disables', fields)
|
||||
|
||||
# a table storing the current queues
|
||||
if not db_backend.table_exists('queues'):
|
||||
fields = [
|
||||
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue'),
|
||||
]
|
||||
db_backend.create_table('queues', fields)
|
||||
|
||||
|
||||
def create_db_backend() -> ISqlDatabaseBackend:
|
||||
# db_server_fqdn = 'alambix-master.ipr.univ-rennes.fr'
|
||||
# db_user = 'qumanw'
|
||||
# db_name = 'quman'
|
||||
# ssh_user = 'qumanw'
|
||||
|
||||
# backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
|
||||
backend = SqliteDb(Path('./quman_test/quman.sqlite'))
|
||||
init_db(backend)
|
||||
|
||||
return backend
|
||||
|
||||
|
||||
class DisableRequest:
|
||||
db_id: int # index of this disable request in the database
|
||||
user_id: UserId # the id of the user who requested the disable action (unix user id)
|
||||
host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested
|
||||
queue_machines: List[QueueMachineId] # the names of the queue machines that were modified
|
||||
reason: str
|
||||
disable_id: DisableId
|
||||
timestamp: datetime
|
||||
|
||||
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_id: DisableId, timestamp: datetime):
|
||||
self.log_id = log_id
|
||||
self.user_id = user_id
|
||||
self.host_fqdn = host_fqdn
|
||||
self.queue_machines = queue_machines
|
||||
self.reason = reason
|
||||
self.disable_id = disable_id
|
||||
self.timestamp = timestamp
|
||||
|
||||
def as_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"log_id": self.log_id,
|
||||
"user_id": self.user_id,
|
||||
"host_fqdn": self.host_fqdn,
|
||||
"queue_machines": self.queue_machines,
|
||||
"reason": self.reason,
|
||||
"disable_id": self.disable_id,
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
class QueueDisableState:
|
||||
state: Dict[QueueMachineId, List[DisableRequest]]
|
||||
|
||||
def __init__(self):
|
||||
self.state = {}
|
||||
|
||||
def set_queue_machine_state(self, queue_machine: QueueMachineId, disable_requests: List[DisableRequest]):
|
||||
assert all(isinstance(dr, DisableRequest) for dr in disable_requests)
|
||||
self.state[queue_machine] = disable_requests
|
||||
|
||||
def as_dict(self) -> str:
|
||||
self_as_dict = {}
|
||||
for queue_machine, disable_requests in self.state.items():
|
||||
self_as_dict[queue_machine] = {
|
||||
"disable_requests": [dr.as_dict() for dr in disable_requests]
|
||||
}
|
||||
return self_as_dict
|
||||
|
||||
|
||||
class QueueManager:
|
||||
db_backend: ISqlDatabaseBackend
|
||||
grid_engine: IGridEngine
|
||||
|
||||
def __init__(self, db_backend: ISqlDatabaseBackend, grid_engine: IGridEngine = None):
|
||||
self.db_backend = db_backend
|
||||
if grid_engine is None:
|
||||
grid_engine = Sge()
|
||||
self.grid_engine = grid_engine
|
||||
|
||||
def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_id: DisableId, reason: str) -> LogId:
|
||||
assert isinstance(queue_machines, list)
|
||||
assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'"
|
||||
userid = subprocess.check_output(['whoami']).decode().strip()
|
||||
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
|
||||
timestamp = datetime.now().isoformat()
|
||||
sql_query = f"INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('{timestamp}', '{userid}', '{host_fqdn}', '{','.join(queue_machines)}', '{action}', '{disable_id}', '{reason}');"
|
||||
self.db_backend.query(sql_query)
|
||||
# get the log id of the disable action that was just inserted
|
||||
log_id = self.db_backend.query("SELECT last_insert_rowid();")[0][0]
|
||||
return log_id
|
||||
|
||||
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
|
||||
sql_query = f"SELECT log.id, log.user_id, log.host_fqdn, log.queue_machines, log.reason, log.disable_id, log.timestamp FROM log JOIN disables ON log.id = disables.disable_request_id WHERE disables.queue_machine = '{queue_machine}' AND log.action = 'disable';"
|
||||
results = self.db_backend.query(sql_query)
|
||||
disable_requests = []
|
||||
for row in results:
|
||||
log_id = row[0]
|
||||
user_id = row[1]
|
||||
host_fqdn = row[2]
|
||||
queue_machines = row[3].split(',')
|
||||
assert queue_machine == queue_machine, "All results should be for the same queue"
|
||||
reason = row[4]
|
||||
disable_id = row[5]
|
||||
timestamp = datetime.fromisoformat(row[6])
|
||||
disable_requests.append(DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp))
|
||||
return disable_requests
|
||||
|
||||
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True):
|
||||
logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason)
|
||||
all_disable_requests = {}
|
||||
for queue_machine in queue_machines:
|
||||
all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine)
|
||||
disable_requests = all_disable_requests[queue_machine]
|
||||
for dr in disable_requests:
|
||||
if dr.disable_id == disable_id:
|
||||
raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.")
|
||||
|
||||
for queue_machine in queue_machines:
|
||||
if perform_disable:
|
||||
disable_requests = all_disable_requests[queue_machine]
|
||||
if len(disable_requests) == 0:
|
||||
# queue is currently active, we can disable it
|
||||
self.grid_engine.disable_queue_machine(queue_machine)
|
||||
|
||||
disable_log_id = self.log_modification(queue_machines, "disable", disable_id, reason)
|
||||
for queue_machine in queue_machines:
|
||||
self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
|
||||
|
||||
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_enable: bool = True):
|
||||
for queue_machine in queue_machines:
|
||||
disable_requests = self.get_disable_requests(queue_machine)
|
||||
dr_to_remove = None # the disable reason to remove
|
||||
for dr in disable_requests:
|
||||
if dr.disable_id == disable_id:
|
||||
dr_to_remove = dr
|
||||
break
|
||||
|
||||
if dr_to_remove is None:
|
||||
raise RuntimeError(f"Disable id {disable_id} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation.")
|
||||
|
||||
if perform_enable:
|
||||
if len(disable_requests) == 1:
|
||||
# queue is currently disabled and there is only one disable reason, we can enable it
|
||||
self.grid_engine.enable_queue_machine(queue_machine)
|
||||
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason) # noqa: F841 pylint:disable=unused-variable
|
||||
for queue_machine in queue_machines:
|
||||
self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
|
||||
|
||||
def synchronize_with_grid_engine(self):
|
||||
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
|
||||
qs = self.grid_engine.get_status()
|
||||
|
||||
db_queues = set()
|
||||
sql_query = "SELECT queue_machine FROM queues;"
|
||||
results = self.db_backend.query(sql_query)
|
||||
for row in results:
|
||||
assert len(row) == 1, "Each row should have only one column (queue_machine)"
|
||||
db_queues.add(row[0])
|
||||
|
||||
for queue_machine, is_enabled in qs.is_enabled.items():
|
||||
if queue_machine not in db_queues:
|
||||
# if the queue is not in the database, we add it
|
||||
logging.warning("Queue %s is not in the database, adding it.", queue_machine)
|
||||
self.db_backend.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
|
||||
else:
|
||||
db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine
|
||||
disable_requests = self.get_disable_requests(queue_machine)
|
||||
if not is_enabled and len(disable_requests) == 0:
|
||||
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine"
|
||||
self.request_queue_machines_deactivation([queue_machine], "quman-sync", "synchronized with grid engine", perform_disable=False)
|
||||
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
|
||||
elif is_enabled and len(disable_requests) > 0:
|
||||
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine"
|
||||
for dr in disable_requests:
|
||||
self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False)
|
||||
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
|
||||
for queue_machine in db_queues:
|
||||
logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine)
|
||||
self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
|
||||
self.db_backend.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
|
||||
|
||||
def get_state(self) -> QueueDisableState:
|
||||
"""returns the state of the queues."""
|
||||
# get the list of queue names from the disables table in the database
|
||||
sql_query = "SELECT DISTINCT queue_machine FROM disables;"
|
||||
results = self.db_backend.query(sql_query)
|
||||
for row in results:
|
||||
assert len(row) == 1, "Each row should have only one column (queue_machine)"
|
||||
queue_machines = [row[0] for row in results]
|
||||
|
||||
state = QueueDisableState()
|
||||
for queue_machine in queue_machines:
|
||||
disable_requests = self.get_disable_requests(queue_machine)
|
||||
state.set_queue_machine_state(queue_machine, disable_requests)
|
||||
return state
|
||||
|
||||
def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]:
|
||||
logging.debug("Getting queue machines for queue_machine identifier '%s'", queue_machine)
|
||||
queue_machines = []
|
||||
if is_queue_machine_id(queue_machine):
|
||||
queue_machines = [queue_machine]
|
||||
else:
|
||||
status = self.grid_engine.get_status()
|
||||
queue_machines = [q for q in status.is_enabled if q.startswith(queue_machine)]
|
||||
assert len(queue_machines) > 0, f"No queue machine found for queue identifier '{queue_machine}'"
|
||||
logging.debug("Found queue machines %s for queue_machine identifier '%s'", queue_machines, queue_machine)
|
||||
return queue_machines
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="qman: manage queue disable/enable requests with logging",
|
||||
prog="quman",
|
||||
epilog="Example usage:\n"
|
||||
" quman add-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance'\n"
|
||||
" quman remove-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance completed'\n"
|
||||
" quman add-disable-request main.q --disable-id admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'\n"
|
||||
" quman show-disable-requests main.q@node42\n"
|
||||
" quman show-disable-requests\n",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
|
||||
parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.")
|
||||
parser.add_argument("--json", action="store_true", help="Output results in JSON format.")
|
||||
|
||||
# add-disable action
|
||||
add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue")
|
||||
add_parser.add_argument("queue", help="Queue to disable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
|
||||
add_parser.add_argument("--disable-id", required=True, help="id for the disable request (e.g., croconaus, manual.graffy, etc.)")
|
||||
add_parser.add_argument("--reason", required=True, help="Reason for the disabling")
|
||||
|
||||
# remove-disable action
|
||||
remove_parser = subparsers.add_parser("remove-disable-request", help="removes a disable request on a queue")
|
||||
remove_parser.add_argument("queue", help="Queue to enable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
|
||||
remove_parser.add_argument("--disable-id", required=True, help="id of the disable request to remove")
|
||||
remove_parser.add_argument("--reason", default="Manual removal", help="Reason for the removal")
|
||||
|
||||
# show-disable-requests action
|
||||
show_parser = subparsers.add_parser("show-disable-requests", help="Show disable requests")
|
||||
show_parser.add_argument("queue", nargs="?", default=None, help="Optional: specific queue to show (if omitted, shows all)")
|
||||
|
||||
try:
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.action is None:
|
||||
parser.print_help()
|
||||
exit(1)
|
||||
|
||||
test_mode = args.test
|
||||
|
||||
if test_mode:
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.")
|
||||
|
||||
else:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
|
||||
if test_mode:
|
||||
queues_status_file_path = Path('./quman_test/queues_status.json')
|
||||
if queues_status_file_path.exists():
|
||||
logging.info("Loading queues status from %s", queues_status_file_path)
|
||||
qs = QueuesStatus(queues_status_file_path)
|
||||
else:
|
||||
logging.info("No queues status file found at %s. Initializing with default queues status.", queues_status_file_path)
|
||||
qs = QueuesStatus()
|
||||
for node_id in range(40, 44):
|
||||
qs.add_queue(f'main.q@alambix{node_id}.ipr.univ-rennes.fr', True)
|
||||
qs.add_queue('gpuonly.q@alambix42', True)
|
||||
grid_engine = MockGridEngine(qs)
|
||||
else:
|
||||
grid_engine = Sge()
|
||||
db_backend = create_db_backend()
|
||||
quman = QueueManager(db_backend, grid_engine)
|
||||
|
||||
quman.synchronize_with_grid_engine()
|
||||
|
||||
if args.action == "add-disable-request":
|
||||
queue_machines = quman.get_queue_machines(args.queue)
|
||||
quman.request_queue_machines_deactivation(queue_machines, args.disable_id, args.reason)
|
||||
elif args.action == "remove-disable-request":
|
||||
queue_machines = quman.get_queue_machines(args.queue)
|
||||
quman.request_queue_machines_activation(queue_machines, args.disable_id, args.reason)
|
||||
elif args.action == "show-disable-requests":
|
||||
if args.queue is None:
|
||||
# Show all disable requests
|
||||
state = quman.get_state()
|
||||
else:
|
||||
# Show disable requests for a specific queue
|
||||
queue_machines = quman.get_queue_machines(args.queue)
|
||||
state = QueueDisableState()
|
||||
for queue_machine in queue_machines:
|
||||
disable_requests = quman.get_disable_requests(queue_machine)
|
||||
state.set_queue_machine_state(queue_machine, disable_requests)
|
||||
if args.json:
|
||||
print(json.dumps(state.as_dict(), indent=2))
|
||||
else:
|
||||
for queue_machine, disable_requests in state.state.items():
|
||||
print(f"Queue machine: {queue_machine}")
|
||||
for dr in disable_requests:
|
||||
print(f" Disable request by {dr.user_id} on {dr.host_fqdn} at {dr.timestamp.isoformat()} with disable id '{dr.disable_id}' for reason: {dr.reason}")
|
||||
|
||||
if test_mode:
|
||||
grid_engine.get_status().save_as_json(queues_status_file_path)
|
||||
|
||||
except RuntimeError as e:
|
||||
sys.stderr.write(f"ERROR: {e}\n")
|
||||
exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,3 +1,6 @@
|
|||
__version__ = '1.0.17'
|
||||
|
||||
|
||||
class Version(object):
|
||||
"""
|
||||
simple version number made of a series of positive integers separated by dots
|
||||
|
|
|
|||
|
|
@ -0,0 +1,34 @@
|
|||
[build-system]
|
||||
requires = ["setuptools"]
|
||||
build-backup = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "cocluto"
|
||||
dynamic = ["version"] # the list of fields whose values are dicovered by the backend (eg __version__)
|
||||
description = "compute cluster utility tools"
|
||||
readme = "README.md"
|
||||
keywords = ["sql", "hpc", "pdu", "power supply", "inventory", "son of grid engine"]
|
||||
license = {text = "MIT License"}
|
||||
dependencies = [
|
||||
"pygraphviz", # requires apt install graphviz-dev
|
||||
"mysqlclient",
|
||||
]
|
||||
requires-python = ">= 3.8"
|
||||
authors = [
|
||||
{name = "Guillaume Raffy", email = "guillaume.raffy@univ-rennes.fr"}
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
quman = "cocluto.quman:main"
|
||||
|
||||
[project.urls]
|
||||
Repository = "https://git.ipr.univ-rennes.fr/cellinfo/cocluto"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = ["cocluto"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "cocluto.version.__version__"}
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
iprbench = ["resources/**/*"]
|
||||
2
setup.py
2
setup.py
|
|
@ -2,7 +2,7 @@ from setuptools import setup
|
|||
|
||||
setup(
|
||||
name='cocluto',
|
||||
version=1.06,
|
||||
version='1.0.9',
|
||||
description='compute cluster utility tools',
|
||||
url='https://git.ipr.univ-rennes1.fr/graffy/cocluto',
|
||||
author='Guillaume Raffy',
|
||||
|
|
|
|||
|
|
@ -0,0 +1,61 @@
|
|||
from pathlib import Path
|
||||
import json
|
||||
import unittest
|
||||
import logging
|
||||
# from cocluto import ClusterController
|
||||
from cocluto.SimpaDbUtil import SqliteDb
|
||||
from cocluto.quman import QueueManager, init_db, MockGridEngine, QueuesStatus
|
||||
|
||||
|
||||
class QumanTestCase(unittest.TestCase):
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def setUp(self) -> None:
|
||||
return super().setUp()
|
||||
|
||||
def test_quman(self):
|
||||
logging.info('test_quman')
|
||||
db_path = Path('./quman_test/quman.sqlite')
|
||||
db_path.parent.mkdir(exist_ok=True)
|
||||
if db_path.exists():
|
||||
db_path.unlink()
|
||||
db_backend = SqliteDb(db_path)
|
||||
init_db(db_backend)
|
||||
qs = QueuesStatus()
|
||||
for node_id in range(40, 44):
|
||||
qs.add_queue(f'main.q@alambix{node_id}', True)
|
||||
qs.add_queue('gpuonly.q@alambix42', True)
|
||||
grid_engine = MockGridEngine(qs)
|
||||
grid_engine.disable_queue_machine('main.q@alambix42') # simulate that the queue is already disabled)
|
||||
quman = QueueManager(db_backend, grid_engine)
|
||||
print('queues state:')
|
||||
grid_engine.queues_status.print()
|
||||
print('disable requests:')
|
||||
print(json.dumps(quman.get_state().as_dict(), indent=2))
|
||||
print('synchronizing with grid engine...')
|
||||
quman.synchronize_with_grid_engine()
|
||||
print('queues state:')
|
||||
grid_engine.queues_status.print()
|
||||
print('disable requests:')
|
||||
print(json.dumps(quman.get_state().as_dict(), indent=2))
|
||||
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
|
||||
with self.assertRaises(RuntimeError):
|
||||
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
|
||||
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman')
|
||||
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco')
|
||||
quman.request_queue_machines_activation(['main.q@alambix42'], 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
|
||||
main_queue_machines = quman.get_queue_machines('main.q')
|
||||
quman.request_queue_machines_deactivation(main_queue_machines, 'sysadmin.graffy.bug4242', 'disable all cluster to prepare complete shutdown')
|
||||
main_queue_machines = quman.get_queue_machines('main.q')
|
||||
quman.request_queue_machines_activation(main_queue_machines, 'sysadmin.graffy.bug4242', 'electricity is back, reactivating all cluster')
|
||||
|
||||
db_backend.dump(Path('./quman_test/quman_dump.sql'))
|
||||
print('queues state:')
|
||||
grid_engine.queues_status.print()
|
||||
print('disable requests:')
|
||||
print(json.dumps(quman.get_state().as_dict(), indent=2))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Reference in New Issue