Compare commits
32 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
9305967d10 | |
|
|
73522f83ed | |
|
|
e17287fcf8 | |
|
|
d9e2199d2e | |
|
|
a7f65a9e66 | |
|
|
0cac428957 | |
|
|
d2c973c7ed | |
|
|
5856ac0951 | |
|
|
aa6c84ef80 | |
|
|
24bfb1af95 | |
|
|
fb5e9fcfa4 | |
|
|
eef27f1dd2 | |
|
|
6974f51221 | |
|
|
c80e3cd382 | |
|
|
9266ad8278 | |
|
|
381ec9f1af | |
|
|
f64e31b420 | |
|
|
04adbbc684 | |
|
|
25afd32504 | |
|
|
4cc541d9c3 | |
|
|
bfe1eeb084 | |
|
|
12be70500b | |
|
|
7acaa1ad5a | |
|
|
0f0d5f800e | |
|
|
f5dce0bf10 | |
|
|
a7d92a3f99 | |
|
|
3beba78ecc | |
|
|
ba3d410b3f | |
|
|
a8b477978d | |
|
|
8a2c46c377 | |
|
|
8679ae2ca5 | |
|
|
27493f2ed7 |
|
|
@ -0,0 +1,68 @@
|
|||
// Jenkinsfile for jenkins.ipr.univ-rennes1.fr (Institut de Physique de Rennes)
|
||||
pipeline {
|
||||
agent {label 'alambix_agent'}
|
||||
environment {
|
||||
VENV_PATH = "${WORKSPACE}/cocluto.venv"
|
||||
}
|
||||
stages {
|
||||
stage('setup') {
|
||||
steps {
|
||||
echo 'setting up itinv test environment'
|
||||
sh """#!/bin/bash
|
||||
python3 -m venv ${VENV_PATH} &&
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
pip install --upgrade pip &&
|
||||
pip install --upgrade setuptools &&
|
||||
pip install .
|
||||
"""
|
||||
}
|
||||
}
|
||||
stage('testing cocluto (cluster tools)') {
|
||||
steps {
|
||||
sh """#!/bin/bash
|
||||
set -o errexit
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
python3 -m unittest test.test_cocluto
|
||||
"""
|
||||
}
|
||||
}
|
||||
stage('testing simpadb') {
|
||||
steps {
|
||||
sh """#!/bin/bash
|
||||
set -o errexit
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
python3 -m unittest test.test_simpadb
|
||||
"""
|
||||
}
|
||||
}
|
||||
stage('testing sqldb') {
|
||||
steps {
|
||||
sh """#!/bin/bash
|
||||
set -o errexit
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
python3 -m unittest test.test_sqldb
|
||||
"""
|
||||
}
|
||||
}
|
||||
stage('testing quman') {
|
||||
steps {
|
||||
sh """#!/bin/bash
|
||||
set -o errexit
|
||||
source ${VENV_PATH}/bin/activate &&
|
||||
python3 -m unittest test.test_quman
|
||||
"""
|
||||
}
|
||||
}
|
||||
}
|
||||
post
|
||||
{
|
||||
// always, success, failure, unstable, changed
|
||||
failure
|
||||
{
|
||||
mail bcc: '', body: "<b>Jenkins build failed</b><br>Project: ${env.JOB_NAME} <br>Build Number: ${env.BUILD_NUMBER} <br>Build URL: ${env.BUILD_URL}", cc: 'guillaume.raffy@univ-rennes.fr, julien.dasilva@univ-rennes.fr', charset: 'UTF-8', from: '', mimeType: 'text/html', replyTo: '', subject: "CI build failed for ${env.JOB_NAME}", to: "info-ipr@univ-rennes1.fr";
|
||||
}
|
||||
cleanup {
|
||||
cleanWs()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -501,25 +501,29 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
|
|||
|
||||
for con in power_config.connections:
|
||||
# print(con.from_plug.machine.name, con.to_plug.machine.name)
|
||||
if not con.is_redundancy_cable(): # don't display redundancy cables, as they might overlap and hide the main one
|
||||
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
|
||||
amperes = power_consumption / 220.0
|
||||
color = '/svg/green'
|
||||
capacity = con.get_max_amperes()
|
||||
penwidth_scaler = 0.25
|
||||
if capacity is None:
|
||||
max_amp = '? A'
|
||||
color = '/svg/red'
|
||||
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
|
||||
else:
|
||||
max_amp = str(capacity) + 'A'
|
||||
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
|
||||
penwidth = capacity * penwidth_scaler
|
||||
label = "%.1f/%s" % (amperes, max_amp)
|
||||
# color='//%d' % int(9.0-amperes/capacity*8)
|
||||
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
|
||||
amperes = power_consumption / 220.0
|
||||
color = '/svg/green'
|
||||
capacity = con.get_max_amperes()
|
||||
penwidth_scaler = 0.25
|
||||
if capacity is None:
|
||||
max_amp = '? A'
|
||||
color = '/svg/red'
|
||||
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
|
||||
else:
|
||||
max_amp = str(capacity) + 'A'
|
||||
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
|
||||
penwidth = capacity * penwidth_scaler
|
||||
label = "%.1f/%s" % (amperes, max_amp)
|
||||
# color='//%d' % int(9.0-amperes/capacity*8)
|
||||
|
||||
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
|
||||
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth)
|
||||
if con.is_redundancy_cable():
|
||||
edge_style = 'dashed'
|
||||
else:
|
||||
edge_style = 'solid'
|
||||
|
||||
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
|
||||
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth, style=edge_style)
|
||||
|
||||
for rack_id, rack in racks.items():
|
||||
# sub = graph.add_subgraph(rack, name='cluster_%s' % rack_id, rank='same')
|
||||
|
|
@ -534,3 +538,4 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
|
|||
|
||||
graph.layout(prog='dot')
|
||||
graph.draw(svg_file_path)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from typing import Union, List
|
||||
from typing import Union, List, Optional, Tuple
|
||||
from pathlib import Path
|
||||
from enum import Enum
|
||||
import json
|
||||
import logging
|
||||
import MySQLdb # sudo port install py-mysql; sudo apt install python-mysqldb or pip install mysqlclient
|
||||
import time
|
||||
|
|
@ -43,6 +44,7 @@ def is_machine_responding(machineName):
|
|||
|
||||
|
||||
SqlQuery = str
|
||||
Table = List[Tuple]
|
||||
|
||||
|
||||
class SqlTableField():
|
||||
|
|
@ -68,6 +70,10 @@ class SqlTableField():
|
|||
self.is_autoinc_index = is_autoinc_index
|
||||
|
||||
|
||||
ColumnId = str # eg 'matrix_size'
|
||||
TableId = str # eg 'benchmark_results'
|
||||
|
||||
|
||||
class ISqlDatabaseBackend(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
|
@ -79,6 +85,30 @@ class ISqlDatabaseBackend(object):
|
|||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def query_select(self, columns: List[ColumnId], table: TableId, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
"""
|
||||
performs a select query on the sql database and returns the results in the form of a list of tuples (one tuple per row, tuple values are the column values)
|
||||
|
||||
:param List[ColumnId] columns: the columns to select
|
||||
:param TableId table: the name of the table to query
|
||||
:param Optional[str] where_clause: the where clause for the query, eg "matrix_size > 100"
|
||||
:param Optional[str] join_clause: the join clause for the query, eg "disables ON log.id = disables.disable_request_id"
|
||||
:param bool distinct: whether to return distinct rows
|
||||
:return: the results of the query
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
"""
|
||||
performs an insert query on the sql database and returns the id of the inserted row
|
||||
|
||||
:param str table_id: the name of the table to insert into
|
||||
:param List[str] fields: the list of fields to insert values into
|
||||
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
|
||||
:return: the id of the inserted row
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
"""returns true if the given table exists in the database
|
||||
|
|
@ -115,6 +145,21 @@ class ISqlDatabaseBackend(object):
|
|||
raise NotImplementedError()
|
||||
|
||||
|
||||
def values_to_sql_string(values: List[tuple]) -> str:
|
||||
'''converts a list of tuples of values into a string that can be used in a sql query, with proper escaping of string values
|
||||
|
||||
eg the list of tuples
|
||||
[('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')]
|
||||
will be converted into the following string that can be used in a sql query :
|
||||
"('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')"
|
||||
'''
|
||||
sql_values = []
|
||||
for value_tuple in values:
|
||||
escaped_value_tuple = tuple(["'%s'" % str(v).replace("'", r"\'") for v in value_tuple])
|
||||
sql_values.append(f'({", ".join(escaped_value_tuple)})')
|
||||
return ', '.join(sql_values)
|
||||
|
||||
|
||||
class RemoteMysqlDb(ISqlDatabaseBackend):
|
||||
def __init__(self, db_server_fqdn, db_user, db_name):
|
||||
"""
|
||||
|
|
@ -135,10 +180,29 @@ class RemoteMysqlDb(ISqlDatabaseBackend):
|
|||
"""
|
||||
:param str sql_query: the sql query to perform
|
||||
"""
|
||||
self._conn.query(sql_query)
|
||||
rows = self._conn.store_result()
|
||||
cursor = self._conn.cursor()
|
||||
cursor.execute(sql_query)
|
||||
rows = cursor.fetchall()
|
||||
logging.debug("RemoteMysqlDb.query:: results of query using cursor '%s': %s", sql_query, rows)
|
||||
return rows
|
||||
|
||||
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
|
||||
if join_clause:
|
||||
sql_query += f" JOIN {join_clause}"
|
||||
if where_clause:
|
||||
sql_query += f" WHERE {where_clause}"
|
||||
return self.query(sql_query)
|
||||
|
||||
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
|
||||
sql_query = sql_query + ";SELECT last_insert_id();"
|
||||
logging.debug("RemoteMysqlDb.query_insert:: sql_query = '%s'", sql_query)
|
||||
rows = self.query(sql_query)
|
||||
last_insert_id = int(rows[0][0])
|
||||
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
|
||||
return last_insert_id
|
||||
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
|
||||
assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
|
||||
|
|
@ -154,6 +218,26 @@ class RemoteMysqlDb(ISqlDatabaseBackend):
|
|||
raise NotImplementedError()
|
||||
|
||||
|
||||
def json_to_table(json_str: str) -> Table:
|
||||
# logging.debug("json_to_table:: json_str = '%s'", json_str)
|
||||
if json_str == 'NULL':
|
||||
return []
|
||||
try:
|
||||
json_data = json.loads(json_str)
|
||||
except json.JSONDecodeError:
|
||||
logging.error("json_to_table:: invalid json string: '%s'", json_str)
|
||||
raise
|
||||
assert isinstance(json_data, list), f'Expected a list of rows in the json string but got {type(json_data)}'
|
||||
table = []
|
||||
for row in json_data:
|
||||
values = []
|
||||
for column_value in row.values():
|
||||
assert isinstance(column_value, (str, int, float)), f'Expected a string, int or float value for each column value in the json string but got {type(column_value)}'
|
||||
values.append(column_value)
|
||||
table.append(tuple(values))
|
||||
return table
|
||||
|
||||
|
||||
class SshAccessedMysqlDb(ISqlDatabaseBackend):
|
||||
|
||||
"""a mysql database server accessed using ssh instead of a remote mysql client
|
||||
|
|
@ -187,9 +271,61 @@ class SshAccessedMysqlDb(ISqlDatabaseBackend):
|
|||
if completed_process.returncode != 0:
|
||||
logging.error(completed_process.stderr.decode(encoding='utf-8'))
|
||||
assert False
|
||||
rows = completed_process.stdout.decode('utf-8').split('\n')
|
||||
stdout = completed_process.stdout.decode('utf-8')
|
||||
logging.debug("SshAccessedMysqlDb.query:: results of query '%s': %s", sql_query, stdout)
|
||||
rows = stdout.split('\n') if stdout != '' else []
|
||||
return rows
|
||||
|
||||
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
# MariaDB [quman]> SELECT JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) FROM log;
|
||||
# +-------------------------------------------------------------------------------------------------------+
|
||||
# | JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) |
|
||||
# +-------------------------------------------------------------------------------------------------------+
|
||||
# | [{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}] |
|
||||
# +-------------------------------------------------------------------------------------------------------+
|
||||
# 1 row in set (0,006 sec)
|
||||
# columns_list_str = ', '.join([f'"{col}"' for col in columns])
|
||||
columns_list_str = ', '.join([f"'{col}', {col}" for col in columns])
|
||||
columns_statement = f'JSON_ARRAYAGG(JSON_OBJECT({columns_list_str}))'
|
||||
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{columns_statement} FROM {table}"
|
||||
if join_clause:
|
||||
sql_query += f" JOIN {join_clause}"
|
||||
if where_clause:
|
||||
sql_query += f" WHERE {where_clause}"
|
||||
stdout = self.query(sql_query)
|
||||
json_str = stdout[-2] # eg '[{"queue_machine": "gpuonly.q@alambix104.ipr.univ-rennes1.fr"}]'
|
||||
assert isinstance(json_str, str), f'Expected a string as data line in the query output but got {type(json_str)}'
|
||||
logging.debug("SshAccessedMysqlDb.query_select:: data line of query '%s': '%s'", sql_query, json_str)
|
||||
# match = re.match(r'^\s*(?P<json_data>{[^}]*})\s*$', data_line)
|
||||
# assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, stdout)
|
||||
# json_data = json.loads(match.group('json_data'))
|
||||
|
||||
table = json_to_table(json_str)
|
||||
return table
|
||||
|
||||
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
logging.debug("SshAccessedMysqlDb.query_insert:: values = %s", values)
|
||||
# INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('2026-05-26T15:31:53.564287', 'graffy', 'alambix50.ipr.univ-rennes.fr', 'gpuonly.q@alambix104.ipr.univ-rennes1.fr', 'disable', 'quman-sync', 'synchronized with grid engine');SELECT JSON_OBJECT('toto', last_insert_id());
|
||||
# Query OK, 1 row affected (0,046 sec)
|
||||
|
||||
# +---------------------------------------+
|
||||
# | JSON_OBJECT('toto', last_insert_id()) |
|
||||
# +---------------------------------------+
|
||||
# | {"toto": 9} |
|
||||
# +---------------------------------------+
|
||||
# 1 row in set (0,002 sec)
|
||||
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
|
||||
sql_query = sql_query + ";SELECT JSON_OBJECT('toto', last_insert_id());"
|
||||
logging.debug("SshAccessedMysqlDb.query_insert:: sql_query = '%s'", sql_query)
|
||||
stdout = self.query(sql_query)
|
||||
data_line = stdout[-2]
|
||||
match = re.match(r'^\s*(?P<json_data>{\s*"toto"\s*:\s*(\d+)\s*})\s*$', data_line)
|
||||
assert match
|
||||
json_data = json.loads(match.group('json_data'))
|
||||
last_insert_id = int(json_data['toto'])
|
||||
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
|
||||
return last_insert_id
|
||||
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
|
||||
logging.debug('len(rows) = %d', len(rows))
|
||||
|
|
@ -245,7 +381,7 @@ class SqliteDb(ISqlDatabaseBackend):
|
|||
:param str database_name: the name of the database withing the sqlite database (eg "iprbench")
|
||||
"""
|
||||
self.sqlite_db_path = sqlite_db_path
|
||||
self._cur = None
|
||||
self._cur = None
|
||||
|
||||
check_same_thread = False
|
||||
# this is to prevent the following error when run from apache/django : SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 139672342353664 and this is thread id 139672333960960
|
||||
|
|
@ -253,7 +389,7 @@ class SqliteDb(ISqlDatabaseBackend):
|
|||
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
|
||||
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
|
||||
|
||||
if sqlite_db_path != ':memory:' and not sqlite_db_path.exists():
|
||||
if sqlite_db_path == ':memory:' or not sqlite_db_path.exists():
|
||||
logging.debug('creating sqlite database in %s', sqlite_db_path)
|
||||
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
|
||||
else:
|
||||
|
|
@ -274,6 +410,35 @@ class SqliteDb(ISqlDatabaseBackend):
|
|||
self._con.commit()
|
||||
return rows
|
||||
|
||||
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
|
||||
if join_clause:
|
||||
sql_query += f" JOIN {join_clause}"
|
||||
if where_clause:
|
||||
sql_query += f" WHERE {where_clause}"
|
||||
return self.query(sql_query)
|
||||
|
||||
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
"""
|
||||
performs an insert query on the sql database and returns the id of the inserted row
|
||||
|
||||
:param str table_id: the name of the table to insert into
|
||||
:param List[str] fields: the list of fields to insert values into
|
||||
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
|
||||
:return: the id of the inserted row
|
||||
"""
|
||||
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
|
||||
logging.debug("SqliteDb.query_insert:: sql_query = '%s'", sql_query)
|
||||
logging.debug("SqliteDb.query_insert:: values = %s", values)
|
||||
self._cur.execute(sql_query)
|
||||
self._cur.execute('SELECT last_insert_rowid();')
|
||||
rows = self._cur.fetchall()
|
||||
self._con.commit()
|
||||
assert len(rows) == 1, f'Unexpected number of rows ({len(rows)}).'
|
||||
last_insert_id = int(rows[0][0])
|
||||
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
|
||||
return last_insert_id
|
||||
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
rows = self.query(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
|
||||
assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
|
||||
|
|
@ -329,7 +494,7 @@ class SqlFile(SqliteDb):
|
|||
# - the file is stored on a solid state disk
|
||||
try:
|
||||
os.remove(sqlite_db_path)
|
||||
except BaseException:
|
||||
except BaseException: # pylint: disable=broad-except
|
||||
pass
|
||||
super().__init__(sqlite_db_path)
|
||||
with open(str(self._sql_file_path), 'r', encoding='utf8') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path
|
||||
|
|
|
|||
|
|
@ -40,6 +40,10 @@ def is_test_machine(name):
|
|||
'physix13',
|
||||
'physix14',
|
||||
'physix15',
|
||||
'alambix12',
|
||||
'alambix13',
|
||||
'alambix14',
|
||||
'alambix15',
|
||||
]
|
||||
|
||||
|
||||
|
|
@ -315,7 +319,7 @@ def draw_machine_age_pyramid_graph(inventory):
|
|||
:param Inventory inventory: the inventory database
|
||||
"""
|
||||
|
||||
oldest_age = 20
|
||||
oldest_age = 25
|
||||
age_histogram = np.zeros(shape=(oldest_age))
|
||||
|
||||
rows = inventory.query("SELECT * FROM machines")
|
||||
|
|
@ -328,7 +332,9 @@ def draw_machine_age_pyramid_graph(inventory):
|
|||
if purchase_date is not None:
|
||||
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
|
||||
age = datetime.datetime.now() - purchase_date
|
||||
age_histogram[age.days // 365] += 1
|
||||
age_in_years = age.days // 365
|
||||
if age_in_years < oldest_age:
|
||||
age_histogram[age_in_years] += 1
|
||||
# print(name, age)
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
|
|
@ -349,7 +355,7 @@ def draw_core_age_pyramid_graph(inventory):
|
|||
:param Inventory inventory: the inventory database
|
||||
"""
|
||||
|
||||
oldest_age = 20
|
||||
oldest_age = 25
|
||||
age_histogram = np.zeros(shape=(oldest_age))
|
||||
|
||||
rows = inventory.query("SELECT * FROM machines")
|
||||
|
|
@ -362,7 +368,9 @@ def draw_core_age_pyramid_graph(inventory):
|
|||
if purchase_date is not None:
|
||||
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
|
||||
age = datetime.datetime.now() - purchase_date
|
||||
age_histogram[age.days // 365] += inventory.get_num_cores(name)
|
||||
age_in_years = age.days // 365
|
||||
if age_in_years < oldest_age:
|
||||
age_histogram[age_in_years] += inventory.get_num_cores(name)
|
||||
# print(name, age)
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,525 @@
|
|||
#!/usr/bin/env python3
|
||||
from abc import ABC, abstractmethod
|
||||
import sys
|
||||
from typing import List, Dict, Any, Union, Optional
|
||||
import logging
|
||||
import subprocess
|
||||
import argparse
|
||||
import re
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from cocluto.sqldb import ISqlDatabaseBackend, ISqlConnection, SqliteDb, SqlTableField, SshAccessedMysqlDb, Table
|
||||
from cocluto.ClusterController.QstatParser import QstatParser
|
||||
from cocluto.ClusterController.JobsState import JobsState
|
||||
|
||||
LogId = int # identifies a log entry in the database
|
||||
DisableId = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
|
||||
QueueId = str # identifies the queue eg main.q
|
||||
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
|
||||
HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr
|
||||
UserId = str # unix user id of a user eg graffy
|
||||
|
||||
|
||||
def is_queue_machine_id(s: str) -> bool:
|
||||
"""returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise."""
|
||||
# if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is
|
||||
return re.match(r'^[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\-\.]+$', s) is not None
|
||||
|
||||
|
||||
class QueuesStatus():
|
||||
is_enabled: Dict[QueueMachineId, bool]
|
||||
|
||||
def __init__(self, queues_status_file_path: Path = None):
|
||||
self.is_enabled = {}
|
||||
if queues_status_file_path is not None:
|
||||
self.load_from_json(queues_status_file_path)
|
||||
|
||||
def load_from_json(self, path: Path):
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
self.is_enabled = json.load(f)
|
||||
|
||||
def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool):
|
||||
self.is_enabled[queue_machine] = is_enabled
|
||||
|
||||
def print(self):
|
||||
for queue_machine, is_enabled in self.is_enabled.items():
|
||||
print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}")
|
||||
|
||||
def save_as_json(self, path: Path):
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.is_enabled, f, indent=2)
|
||||
|
||||
|
||||
class IGridEngine(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def disable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def enable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_status(self) -> QueuesStatus:
|
||||
pass
|
||||
|
||||
|
||||
class MockGridEngine(IGridEngine):
|
||||
queues_status: QueuesStatus
|
||||
|
||||
def __init__(self, queues_status: QueuesStatus):
|
||||
self.queues_status = queues_status
|
||||
|
||||
def disable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
print(f"Mock disable queue {queue_machine}")
|
||||
if queue_machine not in self.queues_status.is_enabled:
|
||||
raise RuntimeError(f"Queue {queue_machine} not found in queues {list(self.queues_status.is_enabled.keys())}")
|
||||
assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled"
|
||||
self.queues_status.is_enabled[queue_machine] = False
|
||||
|
||||
def enable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
print(f"Mock enable queue {queue_machine}")
|
||||
if queue_machine not in self.queues_status.is_enabled:
|
||||
raise RuntimeError(f"Queue machine {queue_machine} not found in queues {list(self.queues_status.is_enabled.keys())}")
|
||||
assert not self.queues_status.is_enabled[queue_machine], f"Queue machine {queue_machine} is already enabled"
|
||||
self.queues_status.is_enabled[queue_machine] = True
|
||||
|
||||
def get_status(self) -> QueuesStatus:
|
||||
return self.queues_status
|
||||
|
||||
|
||||
class Sge(IGridEngine):
|
||||
dry_run: bool
|
||||
qmod_is_available: Optional[bool]
|
||||
|
||||
def __init__(self, dry_run: bool = False):
|
||||
self.dry_run = dry_run
|
||||
self.qmod_is_available = None
|
||||
|
||||
def run_qmod(self, args):
|
||||
"""runs qmod with the given arguments."""
|
||||
cmd = ["qmod"] + args
|
||||
if self.dry_run:
|
||||
print(f"Dry run: {' '.join(cmd)}")
|
||||
else:
|
||||
self.check_qmod_availability()
|
||||
try:
|
||||
subprocess.run(cmd, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise RuntimeError(f"qmod command failed: {e}") from e
|
||||
|
||||
def check_qmod_availability(self):
|
||||
if self.qmod_is_available is None:
|
||||
self.qmod_is_available = False
|
||||
# check that qmod command is available
|
||||
try:
|
||||
subprocess.run(["qmod", "-help"], check=True, capture_output=True)
|
||||
except FileNotFoundError as exc:
|
||||
raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.") from exc
|
||||
self.qmod_is_available = True
|
||||
|
||||
def disable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
self.run_qmod(["-d", queue_machine])
|
||||
|
||||
def enable_queue_machine(self, queue_machine: QueueMachineId):
|
||||
self.run_qmod(["-e", queue_machine])
|
||||
|
||||
def get_status(self) -> QueuesStatus:
|
||||
process = subprocess.run(['qstat', '-f', '-u', '*', '-pri'], check=True, capture_output=True)
|
||||
# Parse the output to extract queue statuses
|
||||
# This is a simplified example - you would need to parse the actual qstat output
|
||||
queues_status = QueuesStatus()
|
||||
qstat_parser = QstatParser()
|
||||
jobs_state: JobsState = qstat_parser.parse_qstat_output(process.stdout.decode(), cluster_domain='ipr.univ-rennes.fr')
|
||||
queue_machines = jobs_state.get_queue_machines()
|
||||
for queue_machine in queue_machines.values():
|
||||
queues_status.add_queue(queue_machine.get_name(), not queue_machine.is_disabled())
|
||||
|
||||
return queues_status
|
||||
|
||||
|
||||
def init_db(db_backend: ISqlDatabaseBackend):
|
||||
|
||||
with db_backend.connect() as conn:
|
||||
# a table storing the log of actions (queue activation or deactivation)
|
||||
if not conn.table_exists('log'):
|
||||
fields = [
|
||||
SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True),
|
||||
SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'),
|
||||
SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'),
|
||||
SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'),
|
||||
SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'),
|
||||
SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'),
|
||||
SqlTableField('disable_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
|
||||
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
|
||||
]
|
||||
conn.create_table('log', fields)
|
||||
|
||||
# a table storing the current disable requests
|
||||
if not conn.table_exists('disables'):
|
||||
fields = [
|
||||
SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
|
||||
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue machine that was disabled'),
|
||||
]
|
||||
conn.create_table('disables', fields)
|
||||
|
||||
# a table storing the current queues
|
||||
if not conn.table_exists('queues'):
|
||||
fields = [
|
||||
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue'),
|
||||
]
|
||||
conn.create_table('queues', fields)
|
||||
|
||||
|
||||
def create_db_backend(db_def: str) -> ISqlDatabaseBackend:
|
||||
# db_def: eg '{"type": "mysql", "args": {"server": "db.server.com", "user": "db_user", "name": "quman_db", "ssh_user": "ssh_user"}}' or '{"type": "sqlite", "args": {"database": "./quman.db"}}'
|
||||
db_def = json.loads(db_def)
|
||||
db_type = db_def['type']
|
||||
db_params = db_def['params']
|
||||
if db_type == 'mysql_via_ssh':
|
||||
sql_server_fqdn = db_params['sql_server_fqdn'] # the fully qualified domain name of the mysql server hosting the database eg 'alambix-master.ipr.univ-rennes.fr'
|
||||
db_user = db_params['db_user'] # eg 'qumandbw'
|
||||
db_name = db_params['db_name'] # the name of the database, eg 'quman'
|
||||
ssh_user = db_params['ssh_user'] # the ssh user which has the privileges to access the mysql database, eg 'qumandbw'
|
||||
backend = SshAccessedMysqlDb(sql_server_fqdn, db_user, db_name, ssh_user)
|
||||
command = f'ssh "{ssh_user}@{sql_server_fqdn}" "hostname"'
|
||||
completed_process = subprocess.run(command, shell=True, check=False, capture_output=True)
|
||||
if completed_process.returncode != 0:
|
||||
raise RuntimeError(f"Failed to connect to the mysql server via ssh with command '{command}'. Please check the connection parameters and make sure that the ssh user has access to the server. Error message: {completed_process.stderr.decode()}")
|
||||
elif db_type == 'sqlite':
|
||||
db_path = Path(db_params['sqlite_file_path']) # eg./quman_test/quman.sqlite
|
||||
backend = SqliteDb(db_path)
|
||||
else:
|
||||
raise ValueError("Unsupported database type: %s" % db_type)
|
||||
|
||||
init_db(backend)
|
||||
|
||||
return backend
|
||||
|
||||
|
||||
class DisableRequest:
|
||||
db_id: int # index of this disable request in the database
|
||||
user_id: UserId # the id of the user who requested the disable action (unix user id)
|
||||
host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested
|
||||
queue_machines: List[QueueMachineId] # the names of the queue machines that were modified
|
||||
reason: str
|
||||
disable_id: DisableId
|
||||
timestamp: datetime
|
||||
|
||||
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_id: DisableId, timestamp: datetime):
|
||||
self.log_id = log_id
|
||||
self.user_id = user_id
|
||||
self.host_fqdn = host_fqdn
|
||||
self.queue_machines = queue_machines
|
||||
self.reason = reason
|
||||
self.disable_id = disable_id
|
||||
self.timestamp = timestamp
|
||||
|
||||
def as_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"log_id": self.log_id,
|
||||
"user_id": self.user_id,
|
||||
"host_fqdn": self.host_fqdn,
|
||||
"queue_machines": self.queue_machines,
|
||||
"reason": self.reason,
|
||||
"disable_id": self.disable_id,
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
class QueueDisableState:
|
||||
state: Dict[QueueMachineId, List[DisableRequest]]
|
||||
|
||||
def __init__(self):
|
||||
self.state = {}
|
||||
|
||||
def set_queue_machine_state(self, queue_machine: QueueMachineId, disable_requests: List[DisableRequest]):
|
||||
assert all(isinstance(dr, DisableRequest) for dr in disable_requests)
|
||||
self.state[queue_machine] = disable_requests
|
||||
|
||||
def as_dict(self) -> str:
|
||||
self_as_dict = {}
|
||||
for queue_machine, disable_requests in self.state.items():
|
||||
self_as_dict[queue_machine] = {
|
||||
"disable_requests": [dr.as_dict() for dr in disable_requests]
|
||||
}
|
||||
return self_as_dict
|
||||
|
||||
|
||||
class QueueManager:
|
||||
sql_conn: ISqlConnection
|
||||
grid_engine: IGridEngine
|
||||
|
||||
def __init__(self, sql_conn: ISqlConnection, grid_engine: IGridEngine = None):
|
||||
assert isinstance(sql_conn, ISqlConnection)
|
||||
self.sql_conn = sql_conn
|
||||
if grid_engine is None:
|
||||
grid_engine = Sge()
|
||||
self.grid_engine = grid_engine
|
||||
|
||||
def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_id: DisableId, reason: str) -> LogId:
|
||||
assert isinstance(queue_machines, list)
|
||||
assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'"
|
||||
userid = subprocess.check_output(['whoami']).decode().strip()
|
||||
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
|
||||
timestamp = datetime.now().isoformat()
|
||||
log_id = self.sql_conn.query_insert(table_id="log", fields=['timestamp', 'user_id', 'host_fqdn', 'queue_machines', 'action', 'disable_id', 'reason'], values=[(timestamp, userid, host_fqdn, ','.join(queue_machines), action, disable_id, reason)])
|
||||
return log_id
|
||||
|
||||
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
|
||||
results_table = self.sql_conn.query_select(['log.id', 'log.user_id', 'log.host_fqdn', 'log.queue_machines', 'log.reason', 'log.disable_id', 'log.timestamp'], join_clause="disables ON log.id = disables.disable_request_id", table="log", where_clause=f"disables.queue_machine = '{queue_machine}' AND log.action = 'disable'")
|
||||
assert isinstance(results_table, Table), f"Expected results_table to be of type Table but got {type(results_table)}"
|
||||
disable_requests = []
|
||||
for row in results_table.rows:
|
||||
log_id = row[0]
|
||||
user_id = row[1]
|
||||
host_fqdn = row[2]
|
||||
queue_machines = row[3].split(',')
|
||||
assert queue_machine == queue_machine, "All results should be for the same queue"
|
||||
reason = row[4]
|
||||
disable_id = row[5]
|
||||
timestamp = datetime.fromisoformat(row[6])
|
||||
disable_requests.append(DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp))
|
||||
return disable_requests
|
||||
|
||||
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True):
|
||||
logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason)
|
||||
all_disable_requests = {}
|
||||
for queue_machine in queue_machines:
|
||||
all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine)
|
||||
disable_requests = all_disable_requests[queue_machine]
|
||||
for dr in disable_requests:
|
||||
if dr.disable_id == disable_id:
|
||||
raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.")
|
||||
|
||||
for queue_machine in queue_machines:
|
||||
if perform_disable:
|
||||
disable_requests = all_disable_requests[queue_machine]
|
||||
if len(disable_requests) == 0:
|
||||
# queue is currently active, we can disable it
|
||||
self.grid_engine.disable_queue_machine(queue_machine)
|
||||
|
||||
disable_log_id = self.log_modification(queue_machines, "disable", disable_id, reason)
|
||||
for queue_machine in queue_machines:
|
||||
self.sql_conn.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
|
||||
|
||||
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_enable: bool = True):
|
||||
for queue_machine in queue_machines:
|
||||
disable_requests = self.get_disable_requests(queue_machine)
|
||||
dr_to_remove = None # the disable reason to remove
|
||||
for dr in disable_requests:
|
||||
if dr.disable_id == disable_id:
|
||||
dr_to_remove = dr
|
||||
break
|
||||
|
||||
if dr_to_remove is None:
|
||||
raise RuntimeError(f"Disable id {disable_id} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation.")
|
||||
|
||||
if perform_enable:
|
||||
if len(disable_requests) == 1:
|
||||
# queue is currently disabled and there is only one disable reason, we can enable it
|
||||
self.grid_engine.enable_queue_machine(queue_machine)
|
||||
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason, ) # noqa: F841 pylint:disable=unused-variable
|
||||
for queue_machine in queue_machines:
|
||||
logging.debug('deleting disable request %s', dr_to_remove.log_id)
|
||||
self.sql_conn.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
|
||||
|
||||
def synchronize_with_grid_engine(self):
|
||||
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
|
||||
logging.info('fixing potential inconsistencies (caused by actors using qmod instead of quman) of quman database with the current state of the cluster queues')
|
||||
qs = self.grid_engine.get_status()
|
||||
|
||||
db_queues = set()
|
||||
results_table = self.sql_conn.query_select(columns=["queue_machine"], table="queues")
|
||||
logging.debug("synchronize_with_grid_engine: results of query': %s", results_table)
|
||||
for row in results_table.rows:
|
||||
assert len(row) == 1, "Each row should have only one column (queue_machine) but got row='%s' (len=%d)" % (str(row), len(row))
|
||||
db_queues.add(row[0])
|
||||
|
||||
for queue_machine, is_enabled in qs.is_enabled.items():
|
||||
if queue_machine not in db_queues:
|
||||
# if the queue is not in the database, we add it
|
||||
logging.warning("Queue %s is not in the database, adding it.", queue_machine)
|
||||
self.sql_conn.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
|
||||
else:
|
||||
db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine
|
||||
disable_requests = self.get_disable_requests(queue_machine)
|
||||
if not is_enabled and len(disable_requests) == 0:
|
||||
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine"
|
||||
disable_id = "quman-sync"
|
||||
logging.warning('adding disable %s on %s because this queue is disabled in grid engine', disable_id, queue_machine)
|
||||
self.request_queue_machines_deactivation([queue_machine], disable_id, "synchronized with grid engine", perform_disable=False)
|
||||
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
|
||||
elif is_enabled and len(disable_requests) > 0:
|
||||
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine"
|
||||
for dr in disable_requests:
|
||||
logging.warning('removing disable %s on %s because this queue is enabled in grid engine', dr.disable_id, queue_machine)
|
||||
self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False)
|
||||
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
|
||||
for queue_machine in db_queues:
|
||||
logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine)
|
||||
self.sql_conn.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
|
||||
self.sql_conn.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
|
||||
logging.info('quman database is consistent with the current state of cluster queues')
|
||||
|
||||
def get_state(self) -> QueueDisableState:
|
||||
"""returns the state of the queues."""
|
||||
# get the list of queue names from the disables table in the database
|
||||
table = self.sql_conn.query_select(columns=["queue_machine"], table="disables", distinct=True)
|
||||
for row in table.rows:
|
||||
assert len(row) == 1, "Each row should have only one column (queue_machine)"
|
||||
queue_machines = [row[0] for row in table.rows]
|
||||
|
||||
state = QueueDisableState()
|
||||
for queue_machine in queue_machines:
|
||||
disable_requests = self.get_disable_requests(queue_machine)
|
||||
state.set_queue_machine_state(queue_machine, disable_requests)
|
||||
return state
|
||||
|
||||
def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]:
|
||||
logging.debug("Getting queue machines for queue_machine identifier '%s'", queue_machine)
|
||||
queue_machines = []
|
||||
if is_queue_machine_id(queue_machine):
|
||||
queue_machines = [queue_machine]
|
||||
else:
|
||||
status = self.grid_engine.get_status()
|
||||
queue_machines = [q for q in status.is_enabled if q.startswith(queue_machine)]
|
||||
assert len(queue_machines) > 0, f"No queue machine found for queue identifier '{queue_machine}'"
|
||||
logging.debug("Found queue machines %s for queue_machine identifier '%s'", queue_machines, queue_machine)
|
||||
return queue_machines
|
||||
|
||||
|
||||
class CustomHelpFormatter(
|
||||
argparse.ArgumentDefaultsHelpFormatter, # to get the default values in the help message
|
||||
argparse.RawDescriptionHelpFormatter # to get the newlines in the description and epilog to be properly formatted in the help message
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="qman: manage queue disable/enable requests with logging",
|
||||
prog="quman",
|
||||
epilog="Example usages:\n"
|
||||
" quman add-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance'\n"
|
||||
" quman remove-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance completed'\n"
|
||||
" quman add-disable-request main.q --disable-id admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'\n"
|
||||
" quman show-disable-requests main.q@node42\n"
|
||||
" quman show-disable-requests\n"
|
||||
" quman --db-def='{\"type\": \"sqlite\", \"params\": {\"sqlite_file_path\": \"/tmp/quman.sqlite\"} }' show-disable-requests\n"
|
||||
" quman --db-def='{\"type\": \"mysql_via_ssh\", \"params\": {\"sql_server_fqdn\": \"alambix-master.ipr.univ-rennes.fr\", \"db_user\": \"qumandbw\", \"db_name\": \"quman\", \"ssh_user\": \"qumandbw\"} }' show-disable-requests\n",
|
||||
formatter_class=CustomHelpFormatter
|
||||
)
|
||||
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
|
||||
parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.")
|
||||
default_db_def = {
|
||||
"type": "mysql_via_ssh",
|
||||
"params": {
|
||||
"sql_server_fqdn": "alambix-master.ipr.univ-rennes.fr",
|
||||
"db_user": "qumandbw",
|
||||
"db_name": "quman",
|
||||
"ssh_user": "qumandbw"}}
|
||||
parser.add_argument("--json", action="store_true", help="Output results in JSON format.")
|
||||
parser.add_argument("--db-def", type=str, default=json.dumps(default_db_def), help="the definition in json format of the database storing the disable requests.")
|
||||
# the db synchronization mechanism needs some requirements that are not always available (sge commands such as qmod and qstat, write access on quman's database), as for example on simpaweb; in such environments, --disable-sync allows quman to work for query-only actions such as show-disable-requests
|
||||
parser.add_argument("--disable-sync", action="store_true", default=False, help="if set, disable the update of the db to fix potential inconsistencies with the current state of the queues, prior to execute the asked action")
|
||||
|
||||
# add-disable action
|
||||
add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue")
|
||||
add_parser.add_argument("queue", help="Queue to disable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
|
||||
add_parser.add_argument("--disable-id", required=True, help="id for the disable request (e.g., croconaus, manual.graffy, etc.)")
|
||||
add_parser.add_argument("--reason", required=True, help="Reason for the disabling")
|
||||
|
||||
# remove-disable action
|
||||
remove_parser = subparsers.add_parser("remove-disable-request", help="removes a disable request on a queue")
|
||||
remove_parser.add_argument("queue", help="Queue to enable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
|
||||
remove_parser.add_argument("--disable-id", required=True, help="id of the disable request to remove")
|
||||
remove_parser.add_argument("--reason", default="Manual removal", help="Reason for the removal")
|
||||
|
||||
# show-disable-requests action
|
||||
show_parser = subparsers.add_parser("show-disable-requests", help="Show disable requests")
|
||||
show_parser.add_argument("queue", nargs="?", default=None, help="Optional: specific queue to show (if omitted, shows all)")
|
||||
|
||||
# update-db action
|
||||
update_db_parser = subparsers.add_parser("update-db", help="updates quman database to synchronize with the actual state of the queues in the grid engine by querying qstat. Only needed if the grid engine state has been modified outside of quman (e.g., by manually running qmod). This is automatically done at the beginning of each quman command, so there should be no need to run this command manually in normal usage.") # noqa: F841 pylint:disable=unused-variable
|
||||
|
||||
try:
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.action is None:
|
||||
parser.print_help()
|
||||
exit(1)
|
||||
|
||||
test_mode = args.test
|
||||
|
||||
if test_mode:
|
||||
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.")
|
||||
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
if test_mode:
|
||||
queues_status_file_path = Path('./quman_test/queues_status.json')
|
||||
if queues_status_file_path.exists():
|
||||
logging.info("Loading queues status from %s", queues_status_file_path)
|
||||
qs = QueuesStatus(queues_status_file_path)
|
||||
else:
|
||||
logging.info("No queues status file found at %s. Initializing with default queues status.", queues_status_file_path)
|
||||
qs = QueuesStatus()
|
||||
for node_id in range(40, 44):
|
||||
qs.add_queue(f'main.q@alambix{node_id}.ipr.univ-rennes.fr', True)
|
||||
qs.add_queue('gpuonly.q@alambix42', True)
|
||||
grid_engine = MockGridEngine(qs)
|
||||
else:
|
||||
grid_engine = Sge()
|
||||
db_backend = create_db_backend(args.db_def)
|
||||
with db_backend.connect() as conn:
|
||||
assert isinstance(conn, ISqlConnection), f"Expected db_backend.connect() to return an ISqlConnection but got {type(conn)}"
|
||||
logging.debug("Connected to database successfully with connection %s", conn)
|
||||
quman = QueueManager(conn, grid_engine)
|
||||
|
||||
if not args.disable_sync:
|
||||
# fix potential inconsistencies in quman db regarding the current state of the queues
|
||||
quman.synchronize_with_grid_engine()
|
||||
|
||||
if args.action == "add-disable-request":
|
||||
queue_machines = quman.get_queue_machines(args.queue)
|
||||
quman.request_queue_machines_deactivation(queue_machines, args.disable_id, args.reason)
|
||||
elif args.action == "remove-disable-request":
|
||||
queue_machines = quman.get_queue_machines(args.queue)
|
||||
quman.request_queue_machines_activation(queue_machines, args.disable_id, args.reason)
|
||||
elif args.action == "show-disable-requests":
|
||||
if args.queue is None:
|
||||
# Show all disable requests
|
||||
state = quman.get_state()
|
||||
else:
|
||||
# Show disable requests for a specific queue
|
||||
queue_machines = quman.get_queue_machines(args.queue)
|
||||
state = QueueDisableState()
|
||||
for queue_machine in queue_machines:
|
||||
disable_requests = quman.get_disable_requests(queue_machine)
|
||||
state.set_queue_machine_state(queue_machine, disable_requests)
|
||||
if args.json:
|
||||
print(json.dumps(state.as_dict(), indent=2))
|
||||
else:
|
||||
for queue_machine, disable_requests in state.state.items():
|
||||
print(f"Queue machine: {queue_machine}")
|
||||
for dr in disable_requests:
|
||||
print(f" Disable request {dr.log_id} by {dr.user_id} on {dr.host_fqdn} at {dr.timestamp.isoformat()} with disable id '{dr.disable_id}' for reason: {dr.reason}")
|
||||
elif args.action == "update-db":
|
||||
quman.synchronize_with_grid_engine()
|
||||
|
||||
if test_mode:
|
||||
grid_engine.get_status().save_as_json(queues_status_file_path)
|
||||
|
||||
except RuntimeError as e:
|
||||
sys.stderr.write(f"ERROR: {e}\n")
|
||||
exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,786 @@
|
|||
from typing import Union, List, Optional, Tuple
|
||||
from pathlib import Path
|
||||
from enum import Enum
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
import abc
|
||||
import sqlite3
|
||||
from .mysql2sqlite import mysql_to_sqlite
|
||||
import subprocess
|
||||
import traceback
|
||||
import pexpect
|
||||
|
||||
|
||||
SqlQuery = str
|
||||
|
||||
|
||||
class Table:
|
||||
'''a table is a list of rows, where each row is a tuple of column values
|
||||
'''
|
||||
column_names: Optional[List[str]]
|
||||
rows: List[Tuple]
|
||||
|
||||
def __init__(self, column_names: Optional[List[str]], rows: List[Tuple]):
|
||||
self.column_names = column_names
|
||||
self.rows = rows
|
||||
|
||||
def get_value(self, row: int, col: Union[int, str]) -> Union[str, int, float]:
|
||||
'''returns the value at the given row and column in this table
|
||||
:param row: the index of the row (0-based)
|
||||
:param col: the index or name of the column
|
||||
'''
|
||||
if isinstance(col, str):
|
||||
assert self.column_names is not None, 'Cannot get value by column name because this table does not have column names'
|
||||
try:
|
||||
col_index = self.column_names.index(col)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f'Column name "{col}" not found in table columns {self.column_names}') from exc
|
||||
col = col_index
|
||||
assert isinstance(col, int), f'Unexpected type for col: expected int but got {type(col)}'
|
||||
assert 0 <= row < len(self.rows), f'Row index {row} is out of bounds for table with {len(self.rows)} rows'
|
||||
assert 0 <= col < len(self.rows[row]), f'Column index {col} is out of bounds for table with {len(self.rows[row])} columns'
|
||||
return self.rows[row][col]
|
||||
|
||||
def check_integrity(self):
|
||||
'''checks that all rows in this table have the same number of columns and that the number of column names (if any) matches the number of columns in the rows
|
||||
'''
|
||||
num_cols = None
|
||||
for row in self.rows:
|
||||
if num_cols is None:
|
||||
num_cols = len(row)
|
||||
else:
|
||||
assert len(row) == num_cols, f'All rows in the table should have the same number of columns but got a row with {len(row)} columns while previous rows have {num_cols} columns'
|
||||
if self.column_names is not None:
|
||||
assert len(self.column_names) == num_cols, f'Number of column names {len(self.column_names)} does not match number of columns in rows {num_cols}'
|
||||
|
||||
@property
|
||||
def num_rows(self) -> int:
|
||||
return len(self.rows)
|
||||
|
||||
@property
|
||||
def num_cols(self) -> int:
|
||||
nc = len(self.rows[0]) if len(self.rows) > 0 else 0
|
||||
if self.column_names is not None:
|
||||
assert len(self.column_names) == nc, f'Number of column names {len(self.column_names)} does not match number of columns in rows {nc}'
|
||||
return nc
|
||||
|
||||
|
||||
class SqlTableField():
|
||||
'''description of a field of a sql table
|
||||
'''
|
||||
class Type(Enum):
|
||||
FIELD_TYPE_STRING = 0
|
||||
FIELD_TYPE_INT = 1
|
||||
FIELD_TYPE_FLOAT = 2
|
||||
FIELD_TYPE_TIME = 3
|
||||
|
||||
name: str # the name of the field, eg 'matrix_size'
|
||||
field_type: Type # the type of the field, eg 'PARAM_TYPE_INT'
|
||||
description: str # the description of the field, eg 'the size n of the n*n matrix '
|
||||
is_autoinc_index: bool # indicates if theis field is used as an autoincrement index in the table
|
||||
|
||||
def __init__(self, name: str, field_type: Type, description: str, is_autoinc_index=False):
|
||||
if is_autoinc_index:
|
||||
assert field_type == SqlTableField.Type.FIELD_TYPE_INT, 'only an integer field can be used as a autoincrement table index'
|
||||
self.name = name
|
||||
self.field_type = field_type
|
||||
self.description = description
|
||||
self.is_autoinc_index = is_autoinc_index
|
||||
|
||||
|
||||
ColumnId = str # eg 'matrix_size'
|
||||
TableId = str # eg 'benchmark_results'
|
||||
|
||||
|
||||
class ISqlConnection(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def query(self, sql_query: SqlQuery) -> List[str]:
|
||||
"""
|
||||
:param str sql_query: the sql query to perform
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def query_select(self, columns: List[ColumnId], table: TableId, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
"""
|
||||
performs a select query on the sql database and returns the results in the form of a list of tuples (one tuple per row, tuple values are the column values)
|
||||
|
||||
:param List[ColumnId] columns: the columns to select
|
||||
:param TableId table: the name of the table to query
|
||||
:param Optional[str] where_clause: the where clause for the query, eg "matrix_size > 100"
|
||||
:param Optional[str] join_clause: the join clause for the query, eg "disables ON log.id = disables.disable_request_id"
|
||||
:param bool distinct: whether to return distinct rows
|
||||
:return: the results of the query
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
"""
|
||||
performs an insert query on the sql database and returns the id of the inserted row
|
||||
|
||||
:param str table_id: the name of the table to insert into
|
||||
:param List[str] fields: the list of fields to insert values into
|
||||
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
|
||||
:return: the id of the inserted row
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
"""returns true if the given table exists in the database
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_table(self, table_name: str, fields: List[SqlTableField]):
|
||||
"""creates the table in this sql database
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_table(self, table_name: str):
|
||||
"""deletes the given table in this sql database
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
# @abc.abstractmethod
|
||||
# def add_table_row(self, table_name: str, values: List[SqlTableField]):
|
||||
# """creates the table in this sql database
|
||||
# """
|
||||
# raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
|
||||
"""returns the sql directive for the declaration of the given table field (eg "`matrix_size` real NOT NULL COMMENT 'the size of the matrix'")
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def dump(self, sql_file_path: Path):
|
||||
"""dumps this database into the given sql file"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class ISqlDatabaseBackend(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def connect(self) -> ISqlConnection:
|
||||
"""returns a connection to this sql database backend that can be used to perform sql queries on the database"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def values_to_sql_string(values: List[tuple]) -> str:
|
||||
'''converts a list of tuples of values into a string that can be used in a sql query, with proper escaping of string values
|
||||
|
||||
eg the list of tuples
|
||||
[('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')]
|
||||
will be converted into the following string that can be used in a sql query :
|
||||
"('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')"
|
||||
'''
|
||||
sql_values = []
|
||||
for value_tuple in values:
|
||||
escaped_value_tuple = tuple(["'%s'" % str(v).replace("'", r"\'") for v in value_tuple])
|
||||
sql_values.append(f'({", ".join(escaped_value_tuple)})')
|
||||
return ', '.join(sql_values)
|
||||
|
||||
|
||||
# class RemoteMysqlDb(ISqlDatabaseBackend):
|
||||
# def __init__(self, db_server_fqdn, db_user, db_name):
|
||||
# """
|
||||
# :param str db_server_fqdn: the fully qualified domain name of the server hosting the database, eg simpatix10.univ-rennes1.fr
|
||||
# :param str db_user: the user for accessing the inventory database, eg simpadb_reader
|
||||
# :param str db_name: the name of the inventory database, eg simpadb
|
||||
# """
|
||||
# self.db_server_fqdn = db_server_fqdn
|
||||
# self.db_user = db_user
|
||||
# self.db_name = db_name
|
||||
# self._connect()
|
||||
|
||||
# def _connect(self):
|
||||
# self._conn = MySQLdb.connect(self.db_server_fqdn, self.db_user, '', self.db_name)
|
||||
# assert self._conn
|
||||
|
||||
# def query(self, sql_query) -> List[str]:
|
||||
# """
|
||||
# :param str sql_query: the sql query to perform
|
||||
# """
|
||||
# cursor = self._conn.cursor()
|
||||
# cursor.execute(sql_query)
|
||||
# rows = cursor.fetchall()
|
||||
# logging.debug("RemoteMysqlDb.query:: results of query using cursor '%s': %s", sql_query, rows)
|
||||
# return rows
|
||||
|
||||
# def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
# sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
|
||||
# if join_clause:
|
||||
# sql_query += f" JOIN {join_clause}"
|
||||
# if where_clause:
|
||||
# sql_query += f" WHERE {where_clause}"
|
||||
# return self.query(sql_query)
|
||||
|
||||
# def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
# sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
|
||||
# sql_query = sql_query + ";SELECT last_insert_id();"
|
||||
# logging.debug("RemoteMysqlDb.query_insert:: sql_query = '%s'", sql_query)
|
||||
# rows = self.query(sql_query)
|
||||
# last_insert_id = int(rows[0][0])
|
||||
# assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
|
||||
# return last_insert_id
|
||||
|
||||
# def table_exists(self, table_name: str) -> bool:
|
||||
# rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
|
||||
# assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
|
||||
# return len(rows) == 1
|
||||
|
||||
# def create_table(self, table_name: str, fields: List[SqlTableField]):
|
||||
# raise NotImplementedError()
|
||||
|
||||
# def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
|
||||
# return f'`{field_name}` {field_sql_type} COMMENT \'{field_description}\''
|
||||
|
||||
# def dump(self, sql_file_path: Path):
|
||||
# raise NotImplementedError()
|
||||
|
||||
|
||||
def json_to_table(json_str: str) -> Table:
|
||||
assert isinstance(json_str, str), f'Expected a string as input for json_to_table but got {type(json_str)}' # eg '[{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}]'
|
||||
# logging.debug("json_to_table:: json_str = '%s'", json_str)
|
||||
if json_str == 'NULL':
|
||||
return Table(column_names=None, rows=[])
|
||||
try:
|
||||
json_data = json.loads(json_str)
|
||||
except json.JSONDecodeError:
|
||||
logging.error("json_to_table:: invalid json string: '%s'", json_str)
|
||||
raise
|
||||
assert isinstance(json_data, list), f'Expected a list of rows in the json string but got {type(json_data)}'
|
||||
table = []
|
||||
for row in json_data:
|
||||
row_values = []
|
||||
for value in row.values():
|
||||
assert isinstance(value, (str, int, float)), f'Expected a string, int or float value for each column value in the json string but got {type(value)}'
|
||||
row_values.append(value)
|
||||
table.append(tuple(row_values))
|
||||
return Table(column_names=None, rows=table)
|
||||
|
||||
|
||||
def print_line_chars(line: str):
|
||||
for char in line:
|
||||
# char identifier, eg 'ESC' for 0x1b
|
||||
char_name = {
|
||||
'\x1b': 'ESC',
|
||||
'\r': 'CR',
|
||||
'\n': 'LF'
|
||||
}.get(char, repr(char))
|
||||
logging.debug('char = \'%s\' (hex=%s)', char_name, char.encode("utf-8").hex())
|
||||
|
||||
|
||||
def strip_vt100(text):
|
||||
# ANSI: Standardized, starts with \x1b[.
|
||||
# VT100: Includes private sequences like \x1b(B, \x1b(A, \x1b>, etc., which are not ANSI but widely supported in terminal emulators.
|
||||
# Matches all VT100 escape sequences:
|
||||
# - CSI: \x1b[... (e.g., \x1b[0;1m)
|
||||
# - Fe: \x1b(..., \x1b), \x1b> (e.g., \x1b(B, \x1b>A)
|
||||
# - Single-character: \x1b[@-Z\\_-]
|
||||
vt100_escape = re.compile(r'\x1b(?:[@-Z\\_-]|\[[0-?]*[ -/]*[@-~]|\([A-Za-z]|\)[A-Za-z])')
|
||||
return vt100_escape.sub('', text)
|
||||
|
||||
|
||||
def parse_mysql_stdout(stdout: str) -> Table:
|
||||
# example of mysql query output for the query "SHOW TABLES LIKE 'log';":
|
||||
# > SHOW TABLES LIKE 'log';
|
||||
# > +-----------------------+
|
||||
# > | Tables_in_quman (log) |
|
||||
# > +-----------------------+
|
||||
# > | log |
|
||||
# > +-----------------------+
|
||||
# > 1 row in set (0,001 sec)
|
||||
# >
|
||||
# >
|
||||
# logging.debug("parse_mysql_stdout:: stdout = '%s'", stdout)
|
||||
table = []
|
||||
line_index = 0
|
||||
num_horizontal_separators = 0
|
||||
num_rows = None
|
||||
num_empty_lines = 0
|
||||
column_names = None
|
||||
for line in stdout.split('\n'):
|
||||
# print_line_chars(line)
|
||||
line = strip_vt100(line)
|
||||
line = line.strip('\r') # remove potential carriage return characters that can be present in the mysql output
|
||||
# remove all ansi escape codes
|
||||
# logging.debug("parse_mysql_stdout:: line = '%s' (%d characters, hex=%s)", line, len(line), line.encode('utf-8').hex())
|
||||
if re.match(r'^\s*\+\-+\+\s*$', line):
|
||||
num_horizontal_separators += 1
|
||||
elif re.match(r'^\|.*\|$', line):
|
||||
# eg '| Tables_in_quman (log) |'
|
||||
is_header = num_horizontal_separators == 1
|
||||
if is_header:
|
||||
column_names = [col.strip() for col in line.split('|')[1:-1]]
|
||||
else:
|
||||
assert num_horizontal_separators == 2, f'Unexpected case: a line with values should be between two horizontal separators but got num_horizontal_separators={num_horizontal_separators} for line="{line}"'
|
||||
values = [line.strip() for line in line.split('|')[1:-1]]
|
||||
table.append(tuple(values))
|
||||
elif line.endswith(';'):
|
||||
# this is the echo of the query that we sent to mysql, we can ignore it but we check that it is indeed the first line of the output
|
||||
# eg: 'SHOW TABLES LIKE 'log';'
|
||||
assert line_index == 0, f'Unexpected case: the first line of the mysql output should be the query echo but got "{line}"'
|
||||
elif re.match(r'^Empty set \([^\)]*\)$', line):
|
||||
# eg: 'Empty set (0,000 sec)'
|
||||
num_rows = 0
|
||||
elif re.match(r'^Query OK', line):
|
||||
# eg Query OK, 1 row affected (0,005 sec)
|
||||
pass
|
||||
else:
|
||||
m = re.match(r'^\s*(?P<num_rows>\d+)\s+row[s]*\s+in\s+set\s+\(.*\)\s*$', line)
|
||||
if m:
|
||||
# eg '1 row in set (0,001 sec)'
|
||||
num_rows = int(m.group('num_rows'))
|
||||
elif line.strip() == '':
|
||||
num_empty_lines += 1
|
||||
else:
|
||||
assert False, f'Unexpected line in mysql output: "{line}"'
|
||||
line_index += 1
|
||||
if num_rows is None:
|
||||
num_rows = len(table)
|
||||
assert num_rows == len(table), f'Unexpected case: the number of rows in the mysql output should be {num_rows} but got {len(table)}.'
|
||||
if num_rows > 0:
|
||||
assert num_horizontal_separators == 3, f'Unexpected case: there should be exactly 3 horizontal separators in the mysql output but got {num_horizontal_separators}.'
|
||||
return Table(column_names=column_names, rows=table)
|
||||
|
||||
|
||||
class SshAccessedMysqlConnection(ISqlConnection):
|
||||
|
||||
ssh_mysql_db: 'SshAccessedMysqlDb'
|
||||
mysql_spawn: Optional[pexpect.spawn] = None
|
||||
mysql_prompt_re: str
|
||||
|
||||
def __init__(self, ssh_mysql_db: 'SshAccessedMysqlDb'):
|
||||
self.ssh_mysql_db = ssh_mysql_db
|
||||
self.mysql_spawn = None
|
||||
self.mysql_prompt_re = r'MariaDB \[[^]]+\]> '
|
||||
|
||||
def __enter__(self) -> 'SshAccessedMysqlConnection':
|
||||
# Set up the connection (e.g., open SSH tunnel, connect to MySQL)
|
||||
|
||||
command = f'ssh -t "{self.ssh_mysql_db.ssh_user}@{self.ssh_mysql_db.db_server_fqdn}" "mysql --defaults-group-suffix={self.ssh_mysql_db.db_user}"'
|
||||
# command = [
|
||||
# 'ssh',
|
||||
# '-t',
|
||||
# f"{self.ssh_mysql_db.ssh_user}@{self.ssh_mysql_db.db_server_fqdn}",
|
||||
# f'mysql --defaults-group-suffix={self.ssh_mysql_db.db_user}'
|
||||
# ]
|
||||
|
||||
logging.debug("SshAccessedMysqlConnection.__enter__: running ssh command '%s'", command)
|
||||
try:
|
||||
self.mysql_spawn = pexpect.spawn(command, encoding='utf-8', timeout=10)
|
||||
assert isinstance(self.mysql_spawn, pexpect.spawn)
|
||||
# graffy@alambix50:~$ /usr/bin/ssh -t qumandbw@alambix-master.ipr.univ-rennes.fr 'mysql --defaults-group-suffix=qumandbw'
|
||||
# Welcome to the MariaDB monitor. Commands end with ; or \g.
|
||||
# Your MariaDB connection id is 845
|
||||
# Server version: 10.11.14-MariaDB-0+deb12u2 Debian 12
|
||||
|
||||
# Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
|
||||
|
||||
# Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
|
||||
|
||||
# MariaDB [(none)]>
|
||||
self.mysql_spawn.expect(self.mysql_prompt_re) # Wait for MySQL prompt
|
||||
|
||||
show_databases = False
|
||||
if show_databases:
|
||||
self.mysql_spawn.sendline("show databases;")
|
||||
self.mysql_spawn.expect(self.mysql_prompt_re)
|
||||
logging.debug("databases:")
|
||||
logging.debug(self.mysql_spawn.before) # Print output
|
||||
|
||||
self.mysql_spawn.sendline(f"use {self.ssh_mysql_db.db_name};")
|
||||
self.mysql_spawn.expect(self.mysql_prompt_re)
|
||||
|
||||
except ValueError as e:
|
||||
logging.error("failed to execute the command '%s' (error: %s)", command, str(e))
|
||||
traceback.print_exc()
|
||||
raise RuntimeError(f"Error while connecting to mysql: {str(e)}") from e
|
||||
return self # Return the connection object for use in the 'with'
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
# Clean up the connection
|
||||
logging.debug("SshAccessedMysqlConnection.__exit__: Closing connection...")
|
||||
self.mysql_spawn.sendline("quit")
|
||||
self.mysql_spawn.expect(pexpect.EOF)
|
||||
self.mysql_spawn = None
|
||||
if exc_type:
|
||||
logging.error("An error occurred: %s", exc_val)
|
||||
return False # Propagate exceptions
|
||||
|
||||
def query(self, sql_query: SqlQuery) -> List[str]:
|
||||
"""
|
||||
:param str sql_query: the sql query to perform
|
||||
"""
|
||||
try:
|
||||
self.mysql_spawn.sendline(sql_query)
|
||||
self.mysql_spawn.expect(self.mysql_prompt_re)
|
||||
|
||||
stdout = self.mysql_spawn.before
|
||||
except pexpect.exceptions.ExceptionPexpect as e:
|
||||
logging.error("SshAccessedMysqlDb.query:: error while executing query '%s': %s", sql_query, str(e))
|
||||
raise RuntimeError(f"Error while executing query '{sql_query}': {e}") from e
|
||||
logging.debug("SshAccessedMysqlDb.query:: results of query '%s': %s", sql_query, stdout)
|
||||
rows = stdout.split('\n') if stdout != '' else []
|
||||
# detect errors in the output such as:
|
||||
# ERROR 1142 (42000): DELETE command denied to user 'qumandbw'@'localhost' for table `quman`.`disables`
|
||||
for row in rows:
|
||||
m = re.match(r'^ERROR ', row)
|
||||
if m is not None:
|
||||
raise RuntimeError(f'the sql query "{sql_query}" failed with the error : "{row}"')
|
||||
return rows
|
||||
|
||||
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
# MariaDB [quman]> SELECT JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) FROM log;
|
||||
# +-------------------------------------------------------------------------------------------------------+
|
||||
# | JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) |
|
||||
# +-------------------------------------------------------------------------------------------------------+
|
||||
# | [{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}] |
|
||||
# +-------------------------------------------------------------------------------------------------------+
|
||||
# 1 row in set (0,006 sec)
|
||||
# columns_list_str = ', '.join([f'"{col}"' for col in columns])
|
||||
columns_list_str = ', '.join([f"'{col}', {col}" for col in columns])
|
||||
columns_statement = f'JSON_ARRAYAGG(JSON_OBJECT({columns_list_str}))'
|
||||
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{columns_statement} FROM {table}"
|
||||
if join_clause:
|
||||
sql_query += f" JOIN {join_clause}"
|
||||
if where_clause:
|
||||
sql_query += f" WHERE {where_clause}"
|
||||
sql_query += ';'
|
||||
lines = self.query(sql_query)
|
||||
table = parse_mysql_stdout('\n'.join(lines))
|
||||
assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.'
|
||||
assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.'
|
||||
json_str = table.get_value(row=0, col=0)
|
||||
# json_str is expected to contain a string like '[{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}]'
|
||||
assert isinstance(json_str, str), f'Expected a string as the value of the only cell in the table output but got {type(json_str)}'
|
||||
# logging.debug("SshAccessedMysqlDb.query_select:: value (in the form of a stringified json tree) of the cell for the query '%s': '%s'", sql_query, json_str)
|
||||
# match = re.match(r'^\s*(?P<json_data>{[^}]*})\s*$', data_line)
|
||||
# assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, stdout)
|
||||
# json_data = json.loads(match.group('json_data'))
|
||||
t = json_to_table(json_str)
|
||||
assert isinstance(t, Table), f'Unexpected case: json_to_table should return a Table object but got {type(t)}'
|
||||
return t
|
||||
|
||||
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
logging.debug("SshAccessedMysqlDb.query_insert:: values = %s", values)
|
||||
# INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('2026-05-26T15:31:53.564287', 'graffy', 'alambix50.ipr.univ-rennes.fr', 'gpuonly.q@alambix104.ipr.univ-rennes1.fr', 'disable', 'quman-sync', 'synchronized with grid engine');SELECT JSON_OBJECT('toto', last_insert_id());
|
||||
# Query OK, 1 row affected (0,046 sec)
|
||||
|
||||
# +---------------------------------------+
|
||||
# | JSON_OBJECT('toto', last_insert_id()) |
|
||||
# +---------------------------------------+
|
||||
# | {"toto": 9} |
|
||||
# +---------------------------------------+
|
||||
# 1 row in set (0,002 sec)
|
||||
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
|
||||
sql_query = sql_query + ";SELECT JSON_OBJECT('toto', last_insert_id());"
|
||||
logging.debug("SshAccessedMysqlDb.query_insert:: sql_query = '%s'", sql_query)
|
||||
stdout_lines = self.query(sql_query)
|
||||
table: Table = parse_mysql_stdout('\n'.join(stdout_lines))
|
||||
assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.'
|
||||
assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.'
|
||||
data_line = table.get_value(row=0, col=0)
|
||||
assert isinstance(data_line, str), f'Expected a string as data line in the query output but got {type(data_line)}'
|
||||
logging.debug("SshAccessedMysqlDb.query_insert:: data line of query '%s': '%s'", sql_query, data_line)
|
||||
match = re.match(r'^\s*(?P<json_data>{\s*"toto"\s*:\s*(\d+)\s*})\s*$', data_line)
|
||||
assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, data_line)
|
||||
json_data = json.loads(match.group('json_data'))
|
||||
last_insert_id = int(json_data['toto'])
|
||||
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
|
||||
return last_insert_id
|
||||
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
|
||||
table: Table = parse_mysql_stdout('\n'.join(rows))
|
||||
logging.debug('len(rows) = %d', len(rows))
|
||||
logging.debug('rows = %s', str(rows))
|
||||
assert table.num_rows <= 1, f'Unexpected case: more than one ({len(table)}) tables match the table name {table_name}.'
|
||||
if table.num_rows > 0:
|
||||
assert table.column_names[0].startswith('Tables_in_'), f'Unexpected case: the column name in the output of "SHOW TABLES LIKE ..." should start with "Tables_in_" but got "{table.column_names[0]}"'
|
||||
assert table.get_value(row=0, col=0) == table_name, f'Unexpected case: the table name in the output of "SHOW TABLES LIKE ..." should be "{table_name}" but got "{table.get_value(row=0, col=0)}"'
|
||||
return table.num_rows == 1
|
||||
|
||||
def create_table(self, table_name: str, fields: List[SqlTableField]):
|
||||
# https://www.sqlite.org/autoinc.html
|
||||
# > The AUTOINCREMENT keyword imposes extra CPU, memory, disk space, and disk I/O overhead and should be avoided if not strictly needed. It is usually not needed.
|
||||
fields_sql_descriptions = []
|
||||
for field in fields:
|
||||
sql_field_type = {
|
||||
SqlTableField.Type.FIELD_TYPE_FLOAT: 'real NOT NULL',
|
||||
SqlTableField.Type.FIELD_TYPE_INT: 'int(11) NOT NULL',
|
||||
SqlTableField.Type.FIELD_TYPE_STRING: 'varchar(256) NOT NULL',
|
||||
SqlTableField.Type.FIELD_TYPE_TIME: 'datetime NOT NULL',
|
||||
}[field.field_type]
|
||||
if field.is_autoinc_index:
|
||||
assert field.field_type == SqlTableField.Type.FIELD_TYPE_INT
|
||||
sql_field_type = 'INTEGER PRIMARY KEY AUTO_INCREMENT'
|
||||
fields_sql_description = self.get_field_directive(field.name, sql_field_type, field.description)
|
||||
fields_sql_descriptions.append(fields_sql_description)
|
||||
|
||||
sql_create_table_command = f'CREATE TABLE `{table_name}` ({",".join(fields_sql_descriptions)});'
|
||||
logging.debug('sql_create_table_command = %s', sql_create_table_command)
|
||||
self.query(sql_create_table_command)
|
||||
|
||||
def delete_table(self, table_name: str):
|
||||
sql_create_table_command = f'DROP TABLE `{table_name}`;'
|
||||
self.query(sql_create_table_command)
|
||||
|
||||
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
|
||||
return f'`{field_name}` {field_sql_type} COMMENT \'{field_description}\''
|
||||
|
||||
def dump(self, sql_file_path: Path):
|
||||
# mysqldump -u root --quote-names --opt --single-transaction --quick $db >
|
||||
db = self.ssh_mysql_db
|
||||
command = f'ssh "{db.ssh_user}@{db.db_server_fqdn}" "mysqldump --defaults-group-suffix={db.db_user}" {db.db_name} > {sql_file_path}'
|
||||
_ = subprocess.run(command, shell=True, check=False, capture_output=True)
|
||||
|
||||
|
||||
class SshAccessedMysqlDb(ISqlDatabaseBackend):
|
||||
"""a mysql database server accessed using ssh instead of a remote mysql client
|
||||
|
||||
Instead of accessing the remote sql database from mysql client, this method ssh connects (we expect an unattended connection setup using ssh keys) to the server hosting the database, and once logged in performs a request to the database locally using mysql client running on the server hosting the database.
|
||||
|
||||
This method has a the following benefits over the simple use of mysql:
|
||||
- it's more secure since it benefits from ssh encryption
|
||||
- it doesn't require extra open tcp ports for remote mysql database access
|
||||
|
||||
"""
|
||||
db_server_fqdn: str # the fully qualified domain name of the server hosting the database, eg iprbenchdb.ipr.univ-rennes1.fr
|
||||
db_user: str # the user for accessing the inventory database, eg iprbenchw
|
||||
db_name: str # the name of the database, eg iprbench
|
||||
ssh_user: str # the user on db_server_fqdn that has access to the database db_name
|
||||
|
||||
def __init__(self, db_server_fqdn: str, db_user: str, db_name: str, ssh_user: str):
|
||||
"""
|
||||
:param str db_server_fqdn: the fully qualified domain name of the server hosting the database, eg iprbenchdb.ipr.univ-rennes1.fr
|
||||
:param str db_user: the user for accessing the inventory database, eg iprbenchw
|
||||
:param str db_name: the name of the database, eg iprbench
|
||||
:param str ssh_user: the user on db_server_fqdn that has access to the database db_name
|
||||
"""
|
||||
self.db_server_fqdn = db_server_fqdn
|
||||
self.db_user = db_user
|
||||
self.db_name = db_name
|
||||
self.ssh_user = ssh_user
|
||||
|
||||
def connect(self) -> SshAccessedMysqlConnection:
|
||||
|
||||
return SshAccessedMysqlConnection(self)
|
||||
|
||||
|
||||
class SqliteConnection(ISqlConnection):
|
||||
sqlite_db: 'SqliteDb'
|
||||
_con: sqlite3.Connection
|
||||
_cur: sqlite3.Cursor
|
||||
|
||||
def __init__(self, sql_db: 'SqliteDb'):
|
||||
self.sqlite_db = sql_db
|
||||
self._con = None
|
||||
self._cur = None
|
||||
|
||||
def __enter__(self) -> 'SqliteConnection':
|
||||
# Set up the connection (e.g., open SSH tunnel, connect to MySQL)
|
||||
logging.debug("Opening connection...")
|
||||
|
||||
check_same_thread = False
|
||||
# this is to prevent the following error when run from apache/django : SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 139672342353664 and this is thread id 139672333960960
|
||||
# accordig to https://stackoverflow.com/questions/48218065/programmingerror-sqlite-objects-created-in-a-thread-can-only-be-used-in-that-sa this is ok, as long as there are no concurrent writes
|
||||
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
|
||||
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
|
||||
sqlite_db_path = self.sqlite_db.sqlite_db_path
|
||||
if sqlite_db_path == ':memory:' or not sqlite_db_path.exists():
|
||||
logging.debug('creating sqlite database in %s', sqlite_db_path)
|
||||
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
|
||||
else:
|
||||
logging.debug('reusing existing sqlite database in %s', sqlite_db_path)
|
||||
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
|
||||
self._cur = self._con.cursor()
|
||||
logging.debug('self._con = %s', self._con)
|
||||
logging.debug('self._cur = %s', self._cur)
|
||||
|
||||
_ = self.query('PRAGMA encoding="UTF-8";')
|
||||
|
||||
return self # Return the connection object for use in the 'with'
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
# Clean up the connection
|
||||
logging.debug("Closing connection...")
|
||||
if exc_type:
|
||||
logging.error("An error occurred: %s", exc_val)
|
||||
return False # Propagate exceptions
|
||||
|
||||
def query(self, sql_query) -> List[str]:
|
||||
"""
|
||||
:param str sql_query: the sql query to perform
|
||||
"""
|
||||
self._cur.execute(sql_query)
|
||||
rows = self._cur.fetchall()
|
||||
self._con.commit()
|
||||
return rows
|
||||
|
||||
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
|
||||
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
|
||||
if join_clause:
|
||||
sql_query += f" JOIN {join_clause}"
|
||||
if where_clause:
|
||||
sql_query += f" WHERE {where_clause}"
|
||||
rows = self.query(sql_query)
|
||||
return Table(column_names=columns, rows=rows)
|
||||
|
||||
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
|
||||
"""
|
||||
performs an insert query on the sql database and returns the id of the inserted row
|
||||
|
||||
:param str table_id: the name of the table to insert into
|
||||
:param List[str] fields: the list of fields to insert values into
|
||||
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
|
||||
:return: the id of the inserted row
|
||||
"""
|
||||
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
|
||||
logging.debug("SqliteDb.query_insert:: sql_query = '%s'", sql_query)
|
||||
logging.debug("SqliteDb.query_insert:: values = %s", values)
|
||||
self._cur.execute(sql_query)
|
||||
self._cur.execute('SELECT last_insert_rowid();')
|
||||
rows = self._cur.fetchall()
|
||||
self._con.commit()
|
||||
assert len(rows) == 1, f'Unexpected number of rows ({len(rows)}).'
|
||||
last_insert_id = int(rows[0][0])
|
||||
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
|
||||
return last_insert_id
|
||||
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
rows = self.query(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
|
||||
assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
|
||||
return len(rows) == 1
|
||||
|
||||
def create_table(self, table_name: str, fields: List[SqlTableField]):
|
||||
# https://www.sqlite.org/autoinc.html
|
||||
# > The AUTOINCREMENT keyword imposes extra CPU, memory, disk space, and disk I/O overhead and should be avoided if not strictly needed. It is usually not needed.
|
||||
fields_sql_descriptions = []
|
||||
for field in fields:
|
||||
sql_field_type = {
|
||||
SqlTableField.Type.FIELD_TYPE_FLOAT: 'real NOT NULL',
|
||||
SqlTableField.Type.FIELD_TYPE_INT: 'int(11) NOT NULL',
|
||||
SqlTableField.Type.FIELD_TYPE_STRING: 'varchar(256) NOT NULL',
|
||||
SqlTableField.Type.FIELD_TYPE_TIME: 'datetime NOT NULL',
|
||||
}[field.field_type]
|
||||
if field.is_autoinc_index:
|
||||
assert field.field_type == SqlTableField.Type.FIELD_TYPE_INT
|
||||
sql_field_type = 'INTEGER PRIMARY KEY'
|
||||
fields_sql_description = self.get_field_directive(field.name, sql_field_type, field.description)
|
||||
fields_sql_descriptions.append(fields_sql_description)
|
||||
|
||||
sql_create_table_command = f'CREATE TABLE `{table_name}` ({",".join(fields_sql_descriptions)});'
|
||||
logging.debug('sql_create_table_command = %s', sql_create_table_command)
|
||||
self.query(sql_create_table_command)
|
||||
|
||||
def delete_table(self, table_name: str):
|
||||
sql_create_table_command = f'DROP TABLE `{table_name}`;'
|
||||
self.query(sql_create_table_command)
|
||||
|
||||
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
|
||||
# sqlite doesn't understand the COMMENT keyword, so we use sql comments ( "--" ), as explained in [https://stackoverflow.com/questions/7426205/sqlite-adding-comment-on-descriptions-to-tables-and-columns]
|
||||
return f'`{field_name}` {field_sql_type} -- {field_description}\n'
|
||||
|
||||
def dump(self, sql_file_path: Path):
|
||||
with open(sql_file_path, 'wt', encoding='utf8') as f:
|
||||
for line in self._con.iterdump():
|
||||
f.write(line)
|
||||
|
||||
|
||||
class SqliteDb(ISqlDatabaseBackend):
|
||||
sqlite_db_path: Union[Path, str] # the path of the sqlite database. ':memory:' # sqlite-specific special name for a file stored in memory. We could use something like '/tmp/simpadb.sqlite' here but this would make parsing really slow (1 minute instead of 1s), unless either :
|
||||
|
||||
def __init__(self, sqlite_db_path: Path):
|
||||
self.sqlite_db_path = sqlite_db_path
|
||||
|
||||
def connect(self) -> SqliteConnection:
|
||||
|
||||
return SqliteConnection(self)
|
||||
|
||||
|
||||
class SqlFile(SqliteDb):
|
||||
_sql_file_path: Path
|
||||
|
||||
def __init__(self, sql_file_path: Path, truncate_hex_strings=False):
|
||||
"""
|
||||
:param str sql_file_path: the path of the sql file containing the inventory database
|
||||
"""
|
||||
self._sql_file_path = sql_file_path
|
||||
# _con: sqlite3.Connection = None
|
||||
# _cur: sqlite3.Cursor = None
|
||||
|
||||
sqlite_db_path = ':memory:' # sqlite-specific special name for a file stored in memory. We could use something like '/tmp/simpadb.sqlite' here but this would make parsing really slow (1 minute instead of 1s), unless either :
|
||||
# - proper fix : group of INSERT statements are surrounded by BEGIN and COMMIT (see http://stackoverflow.com/questions/4719836/python-and-sqlite3-adding-thousands-of-rows)
|
||||
# - the file is stored on a solid state disk
|
||||
try:
|
||||
os.remove(sqlite_db_path)
|
||||
except BaseException: # pylint: disable=broad-except
|
||||
pass
|
||||
super().__init__(sqlite_db_path)
|
||||
with open(str(self._sql_file_path), 'r', encoding='utf8') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path
|
||||
sql = f.read() # watch out for built-in `str`
|
||||
# print(sql)
|
||||
|
||||
sqlite_sql = mysql_to_sqlite(sql, truncate_hex_strings)
|
||||
# print(mysql_to_sqlite(sql))
|
||||
# with open('/tmp/toto.sqlite.sql', 'w') as f:
|
||||
# f.write(sqlite_sql)
|
||||
# with open('/tmp/toto.sqlite.sql', 'r') as f:
|
||||
# sqlite_sql = f.read()
|
||||
with self.connect() as conn:
|
||||
conn._cur.executescript(sqlite_sql)
|
||||
|
||||
|
||||
class TableAttrNotFound(Exception):
|
||||
def __init__(self, table, key_name, key_value, attr_name):
|
||||
message = "failed to find in table %s a value for %s where %s is %s" % (table, attr_name, key_name, key_value)
|
||||
super(TableAttrNotFound, self).__init__(message)
|
||||
self.table = table
|
||||
self.key_name = key_name
|
||||
self.key_value = key_value
|
||||
self.attr_name = attr_name
|
||||
|
||||
|
||||
class SqlDatabaseReader(object):
|
||||
|
||||
def __init__(self, inv_provider: ISqlDatabaseBackend):
|
||||
"""
|
||||
:param ISqlDatabaseBackend inv_provider: the input that provides the inventory data
|
||||
"""
|
||||
self._inv_provider = inv_provider
|
||||
|
||||
def query(self, sql_query: SqlQuery):
|
||||
"""
|
||||
performs a query on the sql database
|
||||
|
||||
:param SqlQuery sql_query: the sql query to perform
|
||||
"""
|
||||
return self._inv_provider.query(sql_query)
|
||||
|
||||
def get_table_attr(self, table, key_name, key_value, attr_name):
|
||||
"""
|
||||
reads the value of the fiven attribute of the given item in the given table
|
||||
|
||||
:param str table: the name of the table to read
|
||||
:param str key_name: the name of the column that stores the id of the item to read
|
||||
:param str key_value: the id of the item to read
|
||||
:param str attr_name: the name of the attribute to read from the item
|
||||
"""
|
||||
attr_value = None
|
||||
rows = self.query("SELECT " + attr_name + " FROM " + table + " WHERE " + key_name + "='" + key_value + "'")
|
||||
if len(rows) > 0:
|
||||
attr_value = rows[0][0]
|
||||
else:
|
||||
raise TableAttrNotFound(table, key_name, key_value, attr_name)
|
||||
return attr_value
|
||||
|
|
@ -1,3 +1,6 @@
|
|||
__version__ = '1.0.32'
|
||||
|
||||
|
||||
class Version(object):
|
||||
"""
|
||||
simple version number made of a series of positive integers separated by dots
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
[build-system]
|
||||
requires = ["setuptools"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "cocluto"
|
||||
dynamic = ["version"] # the list of fields whose values are dicovered by the backend (eg __version__)
|
||||
description = "compute cluster utility tools"
|
||||
readme = "README.md"
|
||||
keywords = ["sql", "hpc", "pdu", "power supply", "inventory", "son of grid engine"]
|
||||
license = {text = "MIT License"}
|
||||
dependencies = [
|
||||
"pygraphviz", # requires apt install graphviz-dev
|
||||
"mysqlclient",
|
||||
"pexpect"
|
||||
]
|
||||
requires-python = ">= 3.8"
|
||||
authors = [
|
||||
{name = "Guillaume Raffy", email = "guillaume.raffy@univ-rennes.fr"}
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
quman = "cocluto.quman:main"
|
||||
|
||||
[project.urls]
|
||||
Repository = "https://git.ipr.univ-rennes.fr/cellinfo/cocluto"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = [
|
||||
"cocluto",
|
||||
"cocluto.ClusterController",
|
||||
"cocluto.Ipmi",
|
||||
"cocluto.SunGridEngine",]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "cocluto.version.__version__"}
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
iprbench = ["resources/**/*"]
|
||||
2
setup.py
2
setup.py
|
|
@ -2,7 +2,7 @@ from setuptools import setup
|
|||
|
||||
setup(
|
||||
name='cocluto',
|
||||
version=1.06,
|
||||
version='1.0.9',
|
||||
description='compute cluster utility tools',
|
||||
url='https://git.ipr.univ-rennes1.fr/graffy/cocluto',
|
||||
author='Guillaume Raffy',
|
||||
|
|
|
|||
|
|
@ -0,0 +1,63 @@
|
|||
from pathlib import Path
|
||||
import json
|
||||
import unittest
|
||||
import logging
|
||||
# from cocluto import ClusterController
|
||||
from cocluto.sqldb import SqliteDb
|
||||
from cocluto.quman import QueueManager, init_db, MockGridEngine, QueuesStatus
|
||||
|
||||
|
||||
class QumanTestCase(unittest.TestCase):
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def setUp(self) -> None: # pylint: disable=useless-super-delegation
|
||||
return super().setUp()
|
||||
|
||||
def test_quman(self):
|
||||
logging.info('test_quman')
|
||||
db_path = Path('./quman_test/quman.sqlite')
|
||||
db_path.parent.mkdir(exist_ok=True)
|
||||
if db_path.exists():
|
||||
db_path.unlink()
|
||||
db_backend = SqliteDb(db_path)
|
||||
init_db(db_backend)
|
||||
qs = QueuesStatus()
|
||||
for node_id in range(40, 44):
|
||||
qs.add_queue(f'main.q@alambix{node_id}', True)
|
||||
qs.add_queue('gpuonly.q@alambix42', True)
|
||||
grid_engine = MockGridEngine(qs)
|
||||
grid_engine.disable_queue_machine('main.q@alambix42') # simulate that the queue is already disabled)
|
||||
with db_backend.connect() as sql_conn:
|
||||
quman = QueueManager(sql_conn, grid_engine)
|
||||
print('queues state:')
|
||||
grid_engine.queues_status.print()
|
||||
print('disable requests:')
|
||||
print(json.dumps(quman.get_state().as_dict(), indent=2))
|
||||
print('synchronizing with grid engine...')
|
||||
quman.synchronize_with_grid_engine()
|
||||
print('queues state:')
|
||||
grid_engine.queues_status.print()
|
||||
print('disable requests:')
|
||||
print(json.dumps(quman.get_state().as_dict(), indent=2))
|
||||
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
|
||||
with self.assertRaises(RuntimeError):
|
||||
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
|
||||
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman')
|
||||
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco')
|
||||
quman.request_queue_machines_activation(['main.q@alambix42'], 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
|
||||
main_queue_machines = quman.get_queue_machines('main.q')
|
||||
quman.request_queue_machines_deactivation(main_queue_machines, 'sysadmin.graffy.bug4242', 'disable all cluster to prepare complete shutdown')
|
||||
main_queue_machines = quman.get_queue_machines('main.q')
|
||||
quman.request_queue_machines_activation(main_queue_machines, 'sysadmin.graffy.bug4242', 'electricity is back, reactivating all cluster')
|
||||
quman.synchronize_with_grid_engine()
|
||||
|
||||
sql_conn.dump(Path('./quman_test/quman_dump.sql'))
|
||||
print('queues state:')
|
||||
grid_engine.queues_status.print()
|
||||
print('disable requests:')
|
||||
print(json.dumps(quman.get_state().as_dict(), indent=2))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
from typing import Any
|
||||
import unittest
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from cocluto.sqldb import parse_mysql_stdout, ISqlDatabaseBackend, SqliteDb, SshAccessedMysqlDb, SqlTableField
|
||||
|
||||
|
||||
def stringify(value: Any):
|
||||
return f"'{str(value)}'"
|
||||
|
||||
|
||||
def test_sql_backend(sql_backend: ISqlDatabaseBackend):
|
||||
table_name = 'colutotestmamul1'
|
||||
fields = [
|
||||
SqlTableField('measure_id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the measurement', is_autoinc_index=True),
|
||||
SqlTableField('measurement_time', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this measurment has been made'),
|
||||
SqlTableField('cpu_model', SqlTableField.Type.FIELD_TYPE_STRING, 'The exact model of the cpu running the benchmark eg "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"'),
|
||||
SqlTableField('duration', SqlTableField.Type.FIELD_TYPE_FLOAT, 'the duration of the benchmark (in seconds)'),
|
||||
]
|
||||
with sql_backend.connect() as sql_conn:
|
||||
if sql_conn.table_exists(table_name):
|
||||
sql_conn.delete_table(table_name)
|
||||
sql_conn.create_table(table_name, fields)
|
||||
|
||||
# test ISqlConnection.query method
|
||||
measurements = [
|
||||
{
|
||||
'measurement_time': '1951-12-13 12:34:50',
|
||||
'cpu_model': 'Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz',
|
||||
'duration': 0.42
|
||||
},
|
||||
{
|
||||
'measurement_time': '1985-10-26 09:00:00',
|
||||
'cpu_model': 'Intel(R) Core(TM) i3-1234U CPU @ 1.42GHz',
|
||||
'duration': 3.14
|
||||
}
|
||||
]
|
||||
|
||||
for measurement in measurements:
|
||||
sql_query = f'insert into {table_name}({", ".join([field.name for field in fields if not field.is_autoinc_index])}) values({", ".join([stringify(measurement[field.name]) for field in fields if not field.is_autoinc_index])});'
|
||||
logging.debug('sql_query = %s', sql_query)
|
||||
sql_conn.query(sql_query)
|
||||
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name)
|
||||
assert table.num_rows == 2, f'Unexpected case: the number of rows in the result of the query should be 2 but got {table.num_rows}.'
|
||||
|
||||
# test ISqlConnection.query_insert method
|
||||
measurements2 = [
|
||||
{
|
||||
'measurement_time': '1955-11-12 06:38:00',
|
||||
'cpu_model': 'Intel(R) Core(TM) i7-6666U CPU @ 2.50GHz',
|
||||
'duration': 2.71
|
||||
},
|
||||
]
|
||||
|
||||
expected_measurement_index = 3 # because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index
|
||||
for measurement in measurements2:
|
||||
fields = [field.name for field in fields if not field.is_autoinc_index]
|
||||
values = tuple(measurement[field] for field in fields)
|
||||
measurement_index = sql_conn.query_insert(table_id=table_name, fields=fields, values=[values])
|
||||
assert measurement_index == expected_measurement_index, f'Unexpected case: the index of the inserted measurement should be {expected_measurement_index} (because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index) but got {measurement_index}.'
|
||||
expected_measurement_index += 1
|
||||
|
||||
# test ISqlConnection.query_select method
|
||||
|
||||
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name, where_clause='cpu_model = "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"')
|
||||
assert table.num_rows == 1, f'Unexpected case: the number of rows in the result of the query should be 1 but got {table.num_rows}.'
|
||||
assert table.rows == [('1951-12-13 12:34:50', 0.42)], f'Unexpected case: the rows in the result of the query should be [("1951+12-13 12:34:50", 0.42)] but got {table.rows}.'
|
||||
|
||||
# test ISqlConnection.dump method
|
||||
sql_conn.dump(Path('/tmp/toto.sql'))
|
||||
sql_conn.delete_table(table_name)
|
||||
|
||||
|
||||
class SimpadbTestCase(unittest.TestCase):
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def setUp(self) -> None: # pylint: disable=useless-super-delegation
|
||||
return super().setUp()
|
||||
|
||||
def test_mysql_output_parser(self):
|
||||
mysql_output = '''SHOW TABLES LIKE 'log';\n\
|
||||
+-----------------------+\n\
|
||||
| Tables_in_quman (log) |\n\
|
||||
+-----------------------+\n\
|
||||
| log |\n\
|
||||
+-----------------------+\n\
|
||||
1 row in set (0,001 sec)\n\
|
||||
\n\
|
||||
\n\
|
||||
'''
|
||||
table = parse_mysql_stdout(mysql_output)
|
||||
self.assertEqual(table.column_names, ['Tables_in_quman (log)'])
|
||||
self.assertEqual(table.rows, [('log',)])
|
||||
|
||||
def test_sqlite_backend(self):
|
||||
logging.info('test_sqlite_backend')
|
||||
backend = SqliteDb(Path('/tmp/toto.sqlite'))
|
||||
test_sql_backend(backend)
|
||||
# self.assertIsInstance(job_state, JobsState)
|
||||
|
||||
def test_ssh_accessed_mysql_backend(self):
|
||||
logging.info('test_ssh_accessed_mysql_backend')
|
||||
db_server_fqdn = 'iprbenchsdb.ipr.univ-rennes1.fr'
|
||||
db_user = 'test_iprbenchw'
|
||||
db_name = 'test_iprbenchs'
|
||||
ssh_user = 'test_iprbenchw'
|
||||
|
||||
backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
|
||||
test_sql_backend(backend)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Reference in New Issue