Compare commits

...

32 Commits

Author SHA1 Message Date
Guillaume Raffy 9305967d10 cocluto v1.0.32 - added quman option --disable-sync
- the db synchronization mechanism needs some requirements that are not always available (sge commands such as qmod and qstat, write access on quman's database), as for example on simpaweb; in such environments, --disable-sync allows quman to work for query-only actions such as show-disable-requests

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-06-03 09:37:19 +02:00
Guillaume Raffy 73522f83ed cocluto v1.0.31 - made SshAccessedMysqlConnection.query handle errors
- as a result, quman now displays a useful error message when a sql query fails
- also added information messages when quman is synchronizing quman database with the current state of the queues, as this process takes 5 seconds

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-06-02 18:17:40 +02:00
Guillaume Raffy e17287fcf8 - fixed pylint warnings
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-29 20:24:16 +02:00
Guillaume Raffy d9e2199d2e cocluto v1.0.30 - rewrote SimpaDbUtil.ISqlConnection as sqldb.ISqlConnection for performance reasons
- sqldb.ISqlConnection provides the same functionality as SimpaDbUtil.ISqlConnection, with much greater performance thanks to python contexts. However the interface is not compatible, so codes wanting to switch from SimpaDbUtil.ISqlConnection to sqldb.ISqlConnection need adjusting.
- migrated quman to use sqldb.ISqlConnection (as a result the sync now takes 3seconds instead of 60, because of the drastic reductin of calls to mysql and ssh)
- SimpaDbUtil.ISqlConnection will be deleted once every code using SimpaDbUtil.ISqlConnection has been migrated to sqldb.ISqlConnection

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-29 20:11:28 +02:00
Guillaume Raffy a7f65a9e66 cocluto v1.0.29 - fixed bug that caused quman to not use the right domain name for alambix compute nodes
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-27 16:16:42 +02:00
Guillaume Raffy 0cac428957 cocluto v1.0.28 - fixed bug that caused quman to misfunction with SshAccessedMysqlDb backend
- replaced ISqlDatabaseBackend.get_last_insert_id() with ISqlDatabaseBackend.query_insert(),  as ISqlDatabaseBackend.get_last_insert_id() cannot work with SshAccessedMysqlDb (each call always returns 0 because mysql starts a new session each time)

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-27 15:01:56 +02:00
Guillaume Raffy d2c973c7ed cocluto v1.0.27 - fixes bug in returned table of SshAccessedMysqlDb.query('select...') and SshAccessedMysqlDb.get_last_insert_id()
- splitted  ISqlDatabaseBackend.query() into ISqlDatabaseBackend.query() and the more specialized SqlDatabaseBackend.query_select ()
- this way the return type of query_select is explicit (a Table).
- it also allows robust sql query parsing (usin json).
- replaced calls to ISqlDatabaseBackend.query('select ...') with calls to ISqlDatabaseBackend.query_select()

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-26 16:39:55 +02:00
Guillaume Raffy 5856ac0951 cocluto v1.0.26 - fixed bug in quman's Sge backend that caused Sge.get_status() o fail
( it used the non existing method QueueMachine.is_enabled())

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-26 16:16:43 +02:00
Guillaume Raffy aa6c84ef80 cocluto v1.0.25 - changed default db_user and ssh_user to qumandbw instead of qumanw
- so that it matches the quman database setup used at IPR

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-26 16:12:06 +02:00
Guillaume Raffy 24bfb1af95 cocluto v1.0.24 - fixed bug in quman's Sge backend that aused QstatParser to fail
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-26 16:08:12 +02:00
Guillaume Raffy fb5e9fcfa4 cocluto v1.0.23 - fixed bug in the mechanism detecting the availbaility of qmod
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-21 17:43:03 +02:00
Guillaume Raffy eef27f1dd2 cocluto v1.0.22 - fixes a bug that causes quman to fail when using mysql backend
- added the method ISqlDatabaseBackend.get_last_insert_id because the actual sql function is different in sqlite (last_insert_rowid) and mysql  (last_insert_id), the query is different for both backends

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-05-21 17:41:13 +02:00
Guillaume Raffy 6974f51221 cocluto v1.0.21 - added the option --db-def to quman
- this option allows the quman to work with user configurable database system as a backend. the default uses the database for alambix cluster
- also improved the output of show-disable-requests

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-10 17:58:26 +02:00
Guillaume Raffy c80e3cd382 cocluto v1.0.20 - fixes warnings when installing cocluto
```
WARNING: Missing build requirements in pyproject.toml for file:///home/graffy/work/simpaweb/cocluto.git.
  WARNING: The project does not specify a build backend, and pip cannot fall back to setuptools without 'wheel'.
```

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-10 13:52:04 +02:00
Guillaume Raffy 9266ad8278 cocluto v1.0.19 - added the update-db action to quman
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-10 13:28:36 +02:00
Guillaume Raffy 381ec9f1af cocluto v1.0.18 - fixed a bug that caused submodules to be excluded from install
eg problem visible with the command:
> /usr/bin/python3 -m pip install --root=/ git+https://git.ipr.univ-rennes.fr/cellinfo/cocluto to

then:
```sh
root@alambix50:~# quman
Traceback (most recent call last):
  File "/usr/local/bin/quman", line 5, in <module>
    from cocluto.quman import main
  File "/usr/local/lib/python3.11/dist-packages/cocluto/quman.py", line 13, in <module>
    from cocluto.ClusterController.QstatParser import QstatParser
```

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-10 10:18:06 +02:00
Guillaume Raffy f64e31b420 cocluto v1.0.17 - added json option to quman so that the user can choose to get results in machine friendly (json) or human friendly format
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 18:08:58 +02:00
Guillaume Raffy 04adbbc684 cocluto v1.0.16 - overhauld quman arguments parsing
- quman's argument parsing now uses subparsers to allow for more actions than enable and disable
- added the show-disables action to quman
- added a test mode for the command line quman, that allows the user to test quman using quman's user interface
- renamed disable tag as disable id, as it's more meaningful (identfies a disable request)
- made user errors more user-friendly (ie not polluted with call stack)

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 17:01:56 +02:00
Guillaume Raffy 25afd32504 cocluto v1.0.15 - added user friedly error message when qmod is not available
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 14:20:37 +02:00
Guillaume Raffy 4cc541d9c3 cocluto v1.0.14 - added support to full queues to quman
Whith this feature, it's now possible to disable an entire queue (and not only one queue machine), with

```sh
quman d main.q --disable-tag admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'
```
-  also replaced the term queue name with queue machine for clarity, now that we have both QueueMachineId and QueueId

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-03 11:58:47 +02:00
Guillaume Raffy bfe1eeb084 cocluto v1.0.13 - improvements to quman
- the userid and the host fqdn of are now recorded
- found better names (more appropriate and easier to understand)
  - disable reason -> disable request
  - requester -> disable tag

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-02 19:31:31 +02:00
Guillaume Raffy 12be70500b fixed bug that caused quman test to fail because the temporary directory is missing
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-02 18:13:39 +02:00
Guillaume Raffy 7acaa1ad5a added continuous integration to cocluto
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-02 18:09:17 +02:00
Guillaume Raffy 0f0d5f800e v 1.0.12
- fixed bug in request_queue_activation, which caused it to always enable the queue, even if there are other disables
- added a synchronization mechanism to quman which patches the database to ensure the database is coherent with the current activation of the queues

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 19:08:06 +02:00
Guillaume Raffy f5dce0bf10 v 1.0.11
- added IGridEngine abstraction that allows to switch between dry-run mode and normal mode
- added more commandes to the test

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 17:31:45 +02:00
Guillaume Raffy a7d92a3f99 v 1.0.10
- fixed iteration bug in qman
- added missing unit test

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 16:33:46 +02:00
Guillaume Raffy 3beba78ecc started to develop quman, a tool to handle multiple disable requests on the same queue
- to make this work, cocluto now uses a pyproject.toml configuration
- cocluto v1.0.9
- this is still work in progress, but the basic behaviour

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
2026-04-01 16:12:17 +02:00
Guillaume Raffy ba3d410b3f now that non ipr (ietr) machines have been migrated from physix to alambix, updated the graphs generator accordingly to exclude these machines from the graphs.
work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3560]
fixes [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=4335]
2026-03-06 19:03:54 +01:00
Guillaume Raffy a8b477978d now powermap draws redundant cables as dash lines.
fixes - Bug 4250 - incohérence des consommations affichées par powermap
2025-12-11 14:23:07 +01:00
Guillaume Raffy 8a2c46c377 cocluto v1.08
- fixed bug that caused `draw_machine_age_pyramid_graph` to fail when some cluster machines get  older than 20 years. Increased the age limit to 25 years, and added clipping to the graph to cope with some machines even older than this

fixes [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=4101]
2025-06-24 16:10:09 +02:00
Guillaume Raffy 8679ae2ca5 cocluto v 1.07
fixed logic error foudn when working on [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3979]
2024-11-15 16:41:41 +01:00
Guillaume Raffy 27493f2ed7 fixed logic error foudn when working on [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3979] 2024-11-15 16:40:23 +01:00
11 changed files with 1806 additions and 30 deletions

68
ci/ipr.jenkins Normal file
View File

@ -0,0 +1,68 @@
// Jenkinsfile for jenkins.ipr.univ-rennes1.fr (Institut de Physique de Rennes)
pipeline {
agent {label 'alambix_agent'}
environment {
VENV_PATH = "${WORKSPACE}/cocluto.venv"
}
stages {
stage('setup') {
steps {
echo 'setting up itinv test environment'
sh """#!/bin/bash
python3 -m venv ${VENV_PATH} &&
source ${VENV_PATH}/bin/activate &&
pip install --upgrade pip &&
pip install --upgrade setuptools &&
pip install .
"""
}
}
stage('testing cocluto (cluster tools)') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_cocluto
"""
}
}
stage('testing simpadb') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_simpadb
"""
}
}
stage('testing sqldb') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_sqldb
"""
}
}
stage('testing quman') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_quman
"""
}
}
}
post
{
// always, success, failure, unstable, changed
failure
{
mail bcc: '', body: "<b>Jenkins build failed</b><br>Project: ${env.JOB_NAME} <br>Build Number: ${env.BUILD_NUMBER} <br>Build URL: ${env.BUILD_URL}", cc: 'guillaume.raffy@univ-rennes.fr, julien.dasilva@univ-rennes.fr', charset: 'UTF-8', from: '', mimeType: 'text/html', replyTo: '', subject: "CI build failed for ${env.JOB_NAME}", to: "info-ipr@univ-rennes1.fr";
}
cleanup {
cleanWs()
}
}
}

View File

@ -501,25 +501,29 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
for con in power_config.connections:
# print(con.from_plug.machine.name, con.to_plug.machine.name)
if not con.is_redundancy_cable(): # don't display redundancy cables, as they might overlap and hide the main one
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
amperes = power_consumption / 220.0
color = '/svg/green'
capacity = con.get_max_amperes()
penwidth_scaler = 0.25
if capacity is None:
max_amp = '? A'
color = '/svg/red'
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
else:
max_amp = str(capacity) + 'A'
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
penwidth = capacity * penwidth_scaler
label = "%.1f/%s" % (amperes, max_amp)
# color='//%d' % int(9.0-amperes/capacity*8)
power_consumption = con.get_power_consumption(worst_case_scenario=worst_case_scenario)
amperes = power_consumption / 220.0
color = '/svg/green'
capacity = con.get_max_amperes()
penwidth_scaler = 0.25
if capacity is None:
max_amp = '? A'
color = '/svg/red'
penwidth = 100.0 * penwidth_scaler # make the problem clearly visible
else:
max_amp = str(capacity) + 'A'
color = cable_colorer.get_cable_color(con, worst_case_scenario=worst_case_scenario)
penwidth = capacity * penwidth_scaler
label = "%.1f/%s" % (amperes, max_amp)
# color='//%d' % int(9.0-amperes/capacity*8)
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth)
if con.is_redundancy_cable():
edge_style = 'dashed'
else:
edge_style = 'solid'
# graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color="%s:%s" % (color, wsc_color), label=label, penwidth="%s:%s" % (penwidth, penwidth))
graph.add_edge(con.from_plug.machine.name, con.to_plug.machine.name, color=color, label=label, penwidth=penwidth, style=edge_style)
for rack_id, rack in racks.items():
# sub = graph.add_subgraph(rack, name='cluster_%s' % rack_id, rank='same')
@ -534,3 +538,4 @@ def power_config_to_svg(power_config: PowerConfig, svg_file_path: Path, worst_ca
graph.layout(prog='dot')
graph.draw(svg_file_path)

View File

@ -1,6 +1,7 @@
from typing import Union, List
from typing import Union, List, Optional, Tuple
from pathlib import Path
from enum import Enum
import json
import logging
import MySQLdb # sudo port install py-mysql; sudo apt install python-mysqldb or pip install mysqlclient
import time
@ -43,6 +44,7 @@ def is_machine_responding(machineName):
SqlQuery = str
Table = List[Tuple]
class SqlTableField():
@ -68,6 +70,10 @@ class SqlTableField():
self.is_autoinc_index = is_autoinc_index
ColumnId = str # eg 'matrix_size'
TableId = str # eg 'benchmark_results'
class ISqlDatabaseBackend(object):
def __init__(self):
pass
@ -79,6 +85,30 @@ class ISqlDatabaseBackend(object):
"""
raise NotImplementedError()
def query_select(self, columns: List[ColumnId], table: TableId, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
"""
performs a select query on the sql database and returns the results in the form of a list of tuples (one tuple per row, tuple values are the column values)
:param List[ColumnId] columns: the columns to select
:param TableId table: the name of the table to query
:param Optional[str] where_clause: the where clause for the query, eg "matrix_size > 100"
:param Optional[str] join_clause: the join clause for the query, eg "disables ON log.id = disables.disable_request_id"
:param bool distinct: whether to return distinct rows
:return: the results of the query
"""
raise NotImplementedError()
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
"""
performs an insert query on the sql database and returns the id of the inserted row
:param str table_id: the name of the table to insert into
:param List[str] fields: the list of fields to insert values into
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
:return: the id of the inserted row
"""
raise NotImplementedError()
@abc.abstractmethod
def table_exists(self, table_name: str) -> bool:
"""returns true if the given table exists in the database
@ -115,6 +145,21 @@ class ISqlDatabaseBackend(object):
raise NotImplementedError()
def values_to_sql_string(values: List[tuple]) -> str:
'''converts a list of tuples of values into a string that can be used in a sql query, with proper escaping of string values
eg the list of tuples
[('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')]
will be converted into the following string that can be used in a sql query :
"('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')"
'''
sql_values = []
for value_tuple in values:
escaped_value_tuple = tuple(["'%s'" % str(v).replace("'", r"\'") for v in value_tuple])
sql_values.append(f'({", ".join(escaped_value_tuple)})')
return ', '.join(sql_values)
class RemoteMysqlDb(ISqlDatabaseBackend):
def __init__(self, db_server_fqdn, db_user, db_name):
"""
@ -135,10 +180,29 @@ class RemoteMysqlDb(ISqlDatabaseBackend):
"""
:param str sql_query: the sql query to perform
"""
self._conn.query(sql_query)
rows = self._conn.store_result()
cursor = self._conn.cursor()
cursor.execute(sql_query)
rows = cursor.fetchall()
logging.debug("RemoteMysqlDb.query:: results of query using cursor '%s': %s", sql_query, rows)
return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
if join_clause:
sql_query += f" JOIN {join_clause}"
if where_clause:
sql_query += f" WHERE {where_clause}"
return self.query(sql_query)
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
sql_query = sql_query + ";SELECT last_insert_id();"
logging.debug("RemoteMysqlDb.query_insert:: sql_query = '%s'", sql_query)
rows = self.query(sql_query)
last_insert_id = int(rows[0][0])
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
return last_insert_id
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
@ -154,6 +218,26 @@ class RemoteMysqlDb(ISqlDatabaseBackend):
raise NotImplementedError()
def json_to_table(json_str: str) -> Table:
# logging.debug("json_to_table:: json_str = '%s'", json_str)
if json_str == 'NULL':
return []
try:
json_data = json.loads(json_str)
except json.JSONDecodeError:
logging.error("json_to_table:: invalid json string: '%s'", json_str)
raise
assert isinstance(json_data, list), f'Expected a list of rows in the json string but got {type(json_data)}'
table = []
for row in json_data:
values = []
for column_value in row.values():
assert isinstance(column_value, (str, int, float)), f'Expected a string, int or float value for each column value in the json string but got {type(column_value)}'
values.append(column_value)
table.append(tuple(values))
return table
class SshAccessedMysqlDb(ISqlDatabaseBackend):
"""a mysql database server accessed using ssh instead of a remote mysql client
@ -187,9 +271,61 @@ class SshAccessedMysqlDb(ISqlDatabaseBackend):
if completed_process.returncode != 0:
logging.error(completed_process.stderr.decode(encoding='utf-8'))
assert False
rows = completed_process.stdout.decode('utf-8').split('\n')
stdout = completed_process.stdout.decode('utf-8')
logging.debug("SshAccessedMysqlDb.query:: results of query '%s': %s", sql_query, stdout)
rows = stdout.split('\n') if stdout != '' else []
return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
# MariaDB [quman]> SELECT JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) FROM log;
# +-------------------------------------------------------------------------------------------------------+
# | JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) |
# +-------------------------------------------------------------------------------------------------------+
# | [{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}] |
# +-------------------------------------------------------------------------------------------------------+
# 1 row in set (0,006 sec)
# columns_list_str = ', '.join([f'"{col}"' for col in columns])
columns_list_str = ', '.join([f"'{col}', {col}" for col in columns])
columns_statement = f'JSON_ARRAYAGG(JSON_OBJECT({columns_list_str}))'
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{columns_statement} FROM {table}"
if join_clause:
sql_query += f" JOIN {join_clause}"
if where_clause:
sql_query += f" WHERE {where_clause}"
stdout = self.query(sql_query)
json_str = stdout[-2] # eg '[{"queue_machine": "gpuonly.q@alambix104.ipr.univ-rennes1.fr"}]'
assert isinstance(json_str, str), f'Expected a string as data line in the query output but got {type(json_str)}'
logging.debug("SshAccessedMysqlDb.query_select:: data line of query '%s': '%s'", sql_query, json_str)
# match = re.match(r'^\s*(?P<json_data>{[^}]*})\s*$', data_line)
# assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, stdout)
# json_data = json.loads(match.group('json_data'))
table = json_to_table(json_str)
return table
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
logging.debug("SshAccessedMysqlDb.query_insert:: values = %s", values)
# INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('2026-05-26T15:31:53.564287', 'graffy', 'alambix50.ipr.univ-rennes.fr', 'gpuonly.q@alambix104.ipr.univ-rennes1.fr', 'disable', 'quman-sync', 'synchronized with grid engine');SELECT JSON_OBJECT('toto', last_insert_id());
# Query OK, 1 row affected (0,046 sec)
# +---------------------------------------+
# | JSON_OBJECT('toto', last_insert_id()) |
# +---------------------------------------+
# | {"toto": 9} |
# +---------------------------------------+
# 1 row in set (0,002 sec)
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
sql_query = sql_query + ";SELECT JSON_OBJECT('toto', last_insert_id());"
logging.debug("SshAccessedMysqlDb.query_insert:: sql_query = '%s'", sql_query)
stdout = self.query(sql_query)
data_line = stdout[-2]
match = re.match(r'^\s*(?P<json_data>{\s*"toto"\s*:\s*(\d+)\s*})\s*$', data_line)
assert match
json_data = json.loads(match.group('json_data'))
last_insert_id = int(json_data['toto'])
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
return last_insert_id
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
logging.debug('len(rows) = %d', len(rows))
@ -245,7 +381,7 @@ class SqliteDb(ISqlDatabaseBackend):
:param str database_name: the name of the database withing the sqlite database (eg "iprbench")
"""
self.sqlite_db_path = sqlite_db_path
self._cur = None
self._cur = None
check_same_thread = False
# this is to prevent the following error when run from apache/django : SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 139672342353664 and this is thread id 139672333960960
@ -253,7 +389,7 @@ class SqliteDb(ISqlDatabaseBackend):
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
if sqlite_db_path != ':memory:' and not sqlite_db_path.exists():
if sqlite_db_path == ':memory:' or not sqlite_db_path.exists():
logging.debug('creating sqlite database in %s', sqlite_db_path)
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
else:
@ -274,6 +410,35 @@ class SqliteDb(ISqlDatabaseBackend):
self._con.commit()
return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
if join_clause:
sql_query += f" JOIN {join_clause}"
if where_clause:
sql_query += f" WHERE {where_clause}"
return self.query(sql_query)
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
"""
performs an insert query on the sql database and returns the id of the inserted row
:param str table_id: the name of the table to insert into
:param List[str] fields: the list of fields to insert values into
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
:return: the id of the inserted row
"""
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
logging.debug("SqliteDb.query_insert:: sql_query = '%s'", sql_query)
logging.debug("SqliteDb.query_insert:: values = %s", values)
self._cur.execute(sql_query)
self._cur.execute('SELECT last_insert_rowid();')
rows = self._cur.fetchall()
self._con.commit()
assert len(rows) == 1, f'Unexpected number of rows ({len(rows)}).'
last_insert_id = int(rows[0][0])
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
return last_insert_id
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
@ -329,7 +494,7 @@ class SqlFile(SqliteDb):
# - the file is stored on a solid state disk
try:
os.remove(sqlite_db_path)
except BaseException:
except BaseException: # pylint: disable=broad-except
pass
super().__init__(sqlite_db_path)
with open(str(self._sql_file_path), 'r', encoding='utf8') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path

View File

@ -40,6 +40,10 @@ def is_test_machine(name):
'physix13',
'physix14',
'physix15',
'alambix12',
'alambix13',
'alambix14',
'alambix15',
]
@ -315,7 +319,7 @@ def draw_machine_age_pyramid_graph(inventory):
:param Inventory inventory: the inventory database
"""
oldest_age = 20
oldest_age = 25
age_histogram = np.zeros(shape=(oldest_age))
rows = inventory.query("SELECT * FROM machines")
@ -328,7 +332,9 @@ def draw_machine_age_pyramid_graph(inventory):
if purchase_date is not None:
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
age = datetime.datetime.now() - purchase_date
age_histogram[age.days // 365] += 1
age_in_years = age.days // 365
if age_in_years < oldest_age:
age_histogram[age_in_years] += 1
# print(name, age)
fig, ax = plt.subplots()
@ -349,7 +355,7 @@ def draw_core_age_pyramid_graph(inventory):
:param Inventory inventory: the inventory database
"""
oldest_age = 20
oldest_age = 25
age_histogram = np.zeros(shape=(oldest_age))
rows = inventory.query("SELECT * FROM machines")
@ -362,7 +368,9 @@ def draw_core_age_pyramid_graph(inventory):
if purchase_date is not None:
purchase_time = matplotlib.dates.date2num(purchase_date.date()) # noqa: F841
age = datetime.datetime.now() - purchase_date
age_histogram[age.days // 365] += inventory.get_num_cores(name)
age_in_years = age.days // 365
if age_in_years < oldest_age:
age_histogram[age_in_years] += inventory.get_num_cores(name)
# print(name, age)
fig, ax = plt.subplots()

525
cocluto/quman.py Executable file
View File

@ -0,0 +1,525 @@
#!/usr/bin/env python3
from abc import ABC, abstractmethod
import sys
from typing import List, Dict, Any, Union, Optional
import logging
import subprocess
import argparse
import re
import json
from datetime import datetime
from pathlib import Path
from cocluto.sqldb import ISqlDatabaseBackend, ISqlConnection, SqliteDb, SqlTableField, SshAccessedMysqlDb, Table
from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
LogId = int # identifies a log entry in the database
DisableId = str # unique string that identifies the disable request eg auto.croconaus, manual.graffy, etc.
QueueId = str # identifies the queue eg main.q
QueueMachineId = str # identifies the queue machine eg main.q@alambix42.ipr.univ-rennes.fr
HostFqdn = str # fully qualified domain name of a host eg alambix42.ipr.univ-rennes.fr
UserId = str # unix user id of a user eg graffy
def is_queue_machine_id(s: str) -> bool:
"""returns True if the given string is a QueueMachineId (i.e., it contains '@') and False otherwise."""
# if the queue_machine contains '@', we consider it as a QueueMachineId and return it as is
return re.match(r'^[a-zA-Z0-9_\.]+@[a-zA-Z0-9_\-\.]+$', s) is not None
class QueuesStatus():
is_enabled: Dict[QueueMachineId, bool]
def __init__(self, queues_status_file_path: Path = None):
self.is_enabled = {}
if queues_status_file_path is not None:
self.load_from_json(queues_status_file_path)
def load_from_json(self, path: Path):
with open(path, 'r', encoding='utf-8') as f:
self.is_enabled = json.load(f)
def add_queue(self, queue_machine: QueueMachineId, is_enabled: bool):
self.is_enabled[queue_machine] = is_enabled
def print(self):
for queue_machine, is_enabled in self.is_enabled.items():
print(f"{queue_machine}: {'enabled' if is_enabled else 'disabled'}")
def save_as_json(self, path: Path):
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.is_enabled, f, indent=2)
class IGridEngine(ABC):
@abstractmethod
def disable_queue_machine(self, queue_machine: QueueMachineId):
pass
@abstractmethod
def enable_queue_machine(self, queue_machine: QueueMachineId):
pass
@abstractmethod
def get_status(self) -> QueuesStatus:
pass
class MockGridEngine(IGridEngine):
queues_status: QueuesStatus
def __init__(self, queues_status: QueuesStatus):
self.queues_status = queues_status
def disable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock disable queue {queue_machine}")
if queue_machine not in self.queues_status.is_enabled:
raise RuntimeError(f"Queue {queue_machine} not found in queues {list(self.queues_status.is_enabled.keys())}")
assert self.queues_status.is_enabled[queue_machine], f"Queue {queue_machine} is already disabled"
self.queues_status.is_enabled[queue_machine] = False
def enable_queue_machine(self, queue_machine: QueueMachineId):
print(f"Mock enable queue {queue_machine}")
if queue_machine not in self.queues_status.is_enabled:
raise RuntimeError(f"Queue machine {queue_machine} not found in queues {list(self.queues_status.is_enabled.keys())}")
assert not self.queues_status.is_enabled[queue_machine], f"Queue machine {queue_machine} is already enabled"
self.queues_status.is_enabled[queue_machine] = True
def get_status(self) -> QueuesStatus:
return self.queues_status
class Sge(IGridEngine):
dry_run: bool
qmod_is_available: Optional[bool]
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
self.qmod_is_available = None
def run_qmod(self, args):
"""runs qmod with the given arguments."""
cmd = ["qmod"] + args
if self.dry_run:
print(f"Dry run: {' '.join(cmd)}")
else:
self.check_qmod_availability()
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"qmod command failed: {e}") from e
def check_qmod_availability(self):
if self.qmod_is_available is None:
self.qmod_is_available = False
# check that qmod command is available
try:
subprocess.run(["qmod", "-help"], check=True, capture_output=True)
except FileNotFoundError as exc:
raise RuntimeError("qmod command not found. Please make sure that the grid engine client is installed and qmod command is available in the PATH.") from exc
self.qmod_is_available = True
def disable_queue_machine(self, queue_machine: QueueMachineId):
self.run_qmod(["-d", queue_machine])
def enable_queue_machine(self, queue_machine: QueueMachineId):
self.run_qmod(["-e", queue_machine])
def get_status(self) -> QueuesStatus:
process = subprocess.run(['qstat', '-f', '-u', '*', '-pri'], check=True, capture_output=True)
# Parse the output to extract queue statuses
# This is a simplified example - you would need to parse the actual qstat output
queues_status = QueuesStatus()
qstat_parser = QstatParser()
jobs_state: JobsState = qstat_parser.parse_qstat_output(process.stdout.decode(), cluster_domain='ipr.univ-rennes.fr')
queue_machines = jobs_state.get_queue_machines()
for queue_machine in queue_machines.values():
queues_status.add_queue(queue_machine.get_name(), not queue_machine.is_disabled())
return queues_status
def init_db(db_backend: ISqlDatabaseBackend):
with db_backend.connect() as conn:
# a table storing the log of actions (queue activation or deactivation)
if not conn.table_exists('log'):
fields = [
SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True),
SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'),
SqlTableField('user_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the id of user performing the action (unix user id)'),
SqlTableField('host_fqdn', SqlTableField.Type.FIELD_TYPE_STRING, 'the fully qualified domain name of the host on which the action is performed'),
SqlTableField('queue_machines', SqlTableField.Type.FIELD_TYPE_STRING, 'the comma separated list of queue machines that were modified'),
SqlTableField('action', SqlTableField.Type.FIELD_TYPE_STRING, 'the action performed: "disable" or "enable"'),
SqlTableField('disable_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
]
conn.create_table('log', fields)
# a table storing the current disable requests
if not conn.table_exists('disables'):
fields = [
SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue machine that was disabled'),
]
conn.create_table('disables', fields)
# a table storing the current queues
if not conn.table_exists('queues'):
fields = [
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue'),
]
conn.create_table('queues', fields)
def create_db_backend(db_def: str) -> ISqlDatabaseBackend:
# db_def: eg '{"type": "mysql", "args": {"server": "db.server.com", "user": "db_user", "name": "quman_db", "ssh_user": "ssh_user"}}' or '{"type": "sqlite", "args": {"database": "./quman.db"}}'
db_def = json.loads(db_def)
db_type = db_def['type']
db_params = db_def['params']
if db_type == 'mysql_via_ssh':
sql_server_fqdn = db_params['sql_server_fqdn'] # the fully qualified domain name of the mysql server hosting the database eg 'alambix-master.ipr.univ-rennes.fr'
db_user = db_params['db_user'] # eg 'qumandbw'
db_name = db_params['db_name'] # the name of the database, eg 'quman'
ssh_user = db_params['ssh_user'] # the ssh user which has the privileges to access the mysql database, eg 'qumandbw'
backend = SshAccessedMysqlDb(sql_server_fqdn, db_user, db_name, ssh_user)
command = f'ssh "{ssh_user}@{sql_server_fqdn}" "hostname"'
completed_process = subprocess.run(command, shell=True, check=False, capture_output=True)
if completed_process.returncode != 0:
raise RuntimeError(f"Failed to connect to the mysql server via ssh with command '{command}'. Please check the connection parameters and make sure that the ssh user has access to the server. Error message: {completed_process.stderr.decode()}")
elif db_type == 'sqlite':
db_path = Path(db_params['sqlite_file_path']) # eg./quman_test/quman.sqlite
backend = SqliteDb(db_path)
else:
raise ValueError("Unsupported database type: %s" % db_type)
init_db(backend)
return backend
class DisableRequest:
db_id: int # index of this disable request in the database
user_id: UserId # the id of the user who requested the disable action (unix user id)
host_fqdn: HostFqdn # the fully qualified domain name of the host on which the disable action was requested
queue_machines: List[QueueMachineId] # the names of the queue machines that were modified
reason: str
disable_id: DisableId
timestamp: datetime
def __init__(self, log_id: int, user_id: UserId, host_fqdn: HostFqdn, queue_machines: List[QueueMachineId], reason: str, disable_id: DisableId, timestamp: datetime):
self.log_id = log_id
self.user_id = user_id
self.host_fqdn = host_fqdn
self.queue_machines = queue_machines
self.reason = reason
self.disable_id = disable_id
self.timestamp = timestamp
def as_dict(self) -> Dict[str, Any]:
return {
"log_id": self.log_id,
"user_id": self.user_id,
"host_fqdn": self.host_fqdn,
"queue_machines": self.queue_machines,
"reason": self.reason,
"disable_id": self.disable_id,
"timestamp": self.timestamp.isoformat(),
}
class QueueDisableState:
state: Dict[QueueMachineId, List[DisableRequest]]
def __init__(self):
self.state = {}
def set_queue_machine_state(self, queue_machine: QueueMachineId, disable_requests: List[DisableRequest]):
assert all(isinstance(dr, DisableRequest) for dr in disable_requests)
self.state[queue_machine] = disable_requests
def as_dict(self) -> str:
self_as_dict = {}
for queue_machine, disable_requests in self.state.items():
self_as_dict[queue_machine] = {
"disable_requests": [dr.as_dict() for dr in disable_requests]
}
return self_as_dict
class QueueManager:
sql_conn: ISqlConnection
grid_engine: IGridEngine
def __init__(self, sql_conn: ISqlConnection, grid_engine: IGridEngine = None):
assert isinstance(sql_conn, ISqlConnection)
self.sql_conn = sql_conn
if grid_engine is None:
grid_engine = Sge()
self.grid_engine = grid_engine
def log_modification(self, queue_machines: List[QueueMachineId], action: str, disable_id: DisableId, reason: str) -> LogId:
assert isinstance(queue_machines, list)
assert action in ["disable", "enable"], "Action must be either 'disable' or 'enable'"
userid = subprocess.check_output(['whoami']).decode().strip()
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
timestamp = datetime.now().isoformat()
log_id = self.sql_conn.query_insert(table_id="log", fields=['timestamp', 'user_id', 'host_fqdn', 'queue_machines', 'action', 'disable_id', 'reason'], values=[(timestamp, userid, host_fqdn, ','.join(queue_machines), action, disable_id, reason)])
return log_id
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
results_table = self.sql_conn.query_select(['log.id', 'log.user_id', 'log.host_fqdn', 'log.queue_machines', 'log.reason', 'log.disable_id', 'log.timestamp'], join_clause="disables ON log.id = disables.disable_request_id", table="log", where_clause=f"disables.queue_machine = '{queue_machine}' AND log.action = 'disable'")
assert isinstance(results_table, Table), f"Expected results_table to be of type Table but got {type(results_table)}"
disable_requests = []
for row in results_table.rows:
log_id = row[0]
user_id = row[1]
host_fqdn = row[2]
queue_machines = row[3].split(',')
assert queue_machine == queue_machine, "All results should be for the same queue"
reason = row[4]
disable_id = row[5]
timestamp = datetime.fromisoformat(row[6])
disable_requests.append(DisableRequest(log_id=log_id, user_id=user_id, host_fqdn=host_fqdn, queue_machines=queue_machines, reason=reason, disable_id=disable_id, timestamp=timestamp))
return disable_requests
def request_queue_machines_deactivation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_disable: bool = True):
logging.info("Requesting deactivation of queue machines %s with disable id '%s' for reason: %s", queue_machines, disable_id, reason)
all_disable_requests = {}
for queue_machine in queue_machines:
all_disable_requests[queue_machine] = self.get_disable_requests(queue_machine)
disable_requests = all_disable_requests[queue_machine]
for dr in disable_requests:
if dr.disable_id == disable_id:
raise RuntimeError(f"Disable id {disable_id} has already requested deactivation of queue {queue_machine} for reason '{dr.reason}' at {dr.timestamp.isoformat()}. Cannot request deactivation again without reactivating first.")
for queue_machine in queue_machines:
if perform_disable:
disable_requests = all_disable_requests[queue_machine]
if len(disable_requests) == 0:
# queue is currently active, we can disable it
self.grid_engine.disable_queue_machine(queue_machine)
disable_log_id = self.log_modification(queue_machines, "disable", disable_id, reason)
for queue_machine in queue_machines:
self.sql_conn.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_enable: bool = True):
for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine)
dr_to_remove = None # the disable reason to remove
for dr in disable_requests:
if dr.disable_id == disable_id:
dr_to_remove = dr
break
if dr_to_remove is None:
raise RuntimeError(f"Disable id {disable_id} has not requested deactivation of queue {queue_machine}. Cannot request activation without a prior deactivation.")
if perform_enable:
if len(disable_requests) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue_machine(queue_machine)
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason, ) # noqa: F841 pylint:disable=unused-variable
for queue_machine in queue_machines:
logging.debug('deleting disable request %s', dr_to_remove.log_id)
self.sql_conn.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
def synchronize_with_grid_engine(self):
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
logging.info('fixing potential inconsistencies (caused by actors using qmod instead of quman) of quman database with the current state of the cluster queues')
qs = self.grid_engine.get_status()
db_queues = set()
results_table = self.sql_conn.query_select(columns=["queue_machine"], table="queues")
logging.debug("synchronize_with_grid_engine: results of query': %s", results_table)
for row in results_table.rows:
assert len(row) == 1, "Each row should have only one column (queue_machine) but got row='%s' (len=%d)" % (str(row), len(row))
db_queues.add(row[0])
for queue_machine, is_enabled in qs.is_enabled.items():
if queue_machine not in db_queues:
# if the queue is not in the database, we add it
logging.warning("Queue %s is not in the database, adding it.", queue_machine)
self.sql_conn.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
else:
db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine
disable_requests = self.get_disable_requests(queue_machine)
if not is_enabled and len(disable_requests) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine"
disable_id = "quman-sync"
logging.warning('adding disable %s on %s because this queue is disabled in grid engine', disable_id, queue_machine)
self.request_queue_machines_deactivation([queue_machine], disable_id, "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
elif is_enabled and len(disable_requests) > 0:
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine"
for dr in disable_requests:
logging.warning('removing disable %s on %s because this queue is enabled in grid engine', dr.disable_id, queue_machine)
self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
for queue_machine in db_queues:
logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine)
self.sql_conn.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
self.sql_conn.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
logging.info('quman database is consistent with the current state of cluster queues')
def get_state(self) -> QueueDisableState:
"""returns the state of the queues."""
# get the list of queue names from the disables table in the database
table = self.sql_conn.query_select(columns=["queue_machine"], table="disables", distinct=True)
for row in table.rows:
assert len(row) == 1, "Each row should have only one column (queue_machine)"
queue_machines = [row[0] for row in table.rows]
state = QueueDisableState()
for queue_machine in queue_machines:
disable_requests = self.get_disable_requests(queue_machine)
state.set_queue_machine_state(queue_machine, disable_requests)
return state
def get_queue_machines(self, queue_machine: Union[QueueMachineId, QueueId]) -> List[QueueMachineId]:
logging.debug("Getting queue machines for queue_machine identifier '%s'", queue_machine)
queue_machines = []
if is_queue_machine_id(queue_machine):
queue_machines = [queue_machine]
else:
status = self.grid_engine.get_status()
queue_machines = [q for q in status.is_enabled if q.startswith(queue_machine)]
assert len(queue_machines) > 0, f"No queue machine found for queue identifier '{queue_machine}'"
logging.debug("Found queue machines %s for queue_machine identifier '%s'", queue_machines, queue_machine)
return queue_machines
class CustomHelpFormatter(
argparse.ArgumentDefaultsHelpFormatter, # to get the default values in the help message
argparse.RawDescriptionHelpFormatter # to get the newlines in the description and epilog to be properly formatted in the help message
):
pass
def main():
parser = argparse.ArgumentParser(
description="qman: manage queue disable/enable requests with logging",
prog="quman",
epilog="Example usages:\n"
" quman add-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance'\n"
" quman remove-disable-request main.q@node42 --disable-id croconaus --reason 'maintenance completed'\n"
" quman add-disable-request main.q --disable-id admin.graffy.bug4242 --reason 'preparing cluster to shutdown for power shortage, see bug 4242'\n"
" quman show-disable-requests main.q@node42\n"
" quman show-disable-requests\n"
" quman --db-def='{\"type\": \"sqlite\", \"params\": {\"sqlite_file_path\": \"/tmp/quman.sqlite\"} }' show-disable-requests\n"
" quman --db-def='{\"type\": \"mysql_via_ssh\", \"params\": {\"sql_server_fqdn\": \"alambix-master.ipr.univ-rennes.fr\", \"db_user\": \"qumandbw\", \"db_name\": \"quman\", \"ssh_user\": \"qumandbw\"} }' show-disable-requests\n",
formatter_class=CustomHelpFormatter
)
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
parser.add_argument("--test", action="store_true", help="Run in test mode with MockGridEngine and a local sqlite database. This is meant for testing and development purposes and should not be used in production.")
default_db_def = {
"type": "mysql_via_ssh",
"params": {
"sql_server_fqdn": "alambix-master.ipr.univ-rennes.fr",
"db_user": "qumandbw",
"db_name": "quman",
"ssh_user": "qumandbw"}}
parser.add_argument("--json", action="store_true", help="Output results in JSON format.")
parser.add_argument("--db-def", type=str, default=json.dumps(default_db_def), help="the definition in json format of the database storing the disable requests.")
# the db synchronization mechanism needs some requirements that are not always available (sge commands such as qmod and qstat, write access on quman's database), as for example on simpaweb; in such environments, --disable-sync allows quman to work for query-only actions such as show-disable-requests
parser.add_argument("--disable-sync", action="store_true", default=False, help="if set, disable the update of the db to fix potential inconsistencies with the current state of the queues, prior to execute the asked action")
# add-disable action
add_parser = subparsers.add_parser("add-disable-request", help="adds a disable request to a queue")
add_parser.add_argument("queue", help="Queue to disable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
add_parser.add_argument("--disable-id", required=True, help="id for the disable request (e.g., croconaus, manual.graffy, etc.)")
add_parser.add_argument("--reason", required=True, help="Reason for the disabling")
# remove-disable action
remove_parser = subparsers.add_parser("remove-disable-request", help="removes a disable request on a queue")
remove_parser.add_argument("queue", help="Queue to enable (e.g., main.q@node42@univ-rennes.fr, main.q, etc.)")
remove_parser.add_argument("--disable-id", required=True, help="id of the disable request to remove")
remove_parser.add_argument("--reason", default="Manual removal", help="Reason for the removal")
# show-disable-requests action
show_parser = subparsers.add_parser("show-disable-requests", help="Show disable requests")
show_parser.add_argument("queue", nargs="?", default=None, help="Optional: specific queue to show (if omitted, shows all)")
# update-db action
update_db_parser = subparsers.add_parser("update-db", help="updates quman database to synchronize with the actual state of the queues in the grid engine by querying qstat. Only needed if the grid engine state has been modified outside of quman (e.g., by manually running qmod). This is automatically done at the beginning of each quman command, so there should be no need to run this command manually in normal usage.") # noqa: F841 pylint:disable=unused-variable
try:
args = parser.parse_args()
if args.action is None:
parser.print_help()
exit(1)
test_mode = args.test
if test_mode:
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.")
else:
logging.basicConfig(level=logging.INFO)
if test_mode:
queues_status_file_path = Path('./quman_test/queues_status.json')
if queues_status_file_path.exists():
logging.info("Loading queues status from %s", queues_status_file_path)
qs = QueuesStatus(queues_status_file_path)
else:
logging.info("No queues status file found at %s. Initializing with default queues status.", queues_status_file_path)
qs = QueuesStatus()
for node_id in range(40, 44):
qs.add_queue(f'main.q@alambix{node_id}.ipr.univ-rennes.fr', True)
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
else:
grid_engine = Sge()
db_backend = create_db_backend(args.db_def)
with db_backend.connect() as conn:
assert isinstance(conn, ISqlConnection), f"Expected db_backend.connect() to return an ISqlConnection but got {type(conn)}"
logging.debug("Connected to database successfully with connection %s", conn)
quman = QueueManager(conn, grid_engine)
if not args.disable_sync:
# fix potential inconsistencies in quman db regarding the current state of the queues
quman.synchronize_with_grid_engine()
if args.action == "add-disable-request":
queue_machines = quman.get_queue_machines(args.queue)
quman.request_queue_machines_deactivation(queue_machines, args.disable_id, args.reason)
elif args.action == "remove-disable-request":
queue_machines = quman.get_queue_machines(args.queue)
quman.request_queue_machines_activation(queue_machines, args.disable_id, args.reason)
elif args.action == "show-disable-requests":
if args.queue is None:
# Show all disable requests
state = quman.get_state()
else:
# Show disable requests for a specific queue
queue_machines = quman.get_queue_machines(args.queue)
state = QueueDisableState()
for queue_machine in queue_machines:
disable_requests = quman.get_disable_requests(queue_machine)
state.set_queue_machine_state(queue_machine, disable_requests)
if args.json:
print(json.dumps(state.as_dict(), indent=2))
else:
for queue_machine, disable_requests in state.state.items():
print(f"Queue machine: {queue_machine}")
for dr in disable_requests:
print(f" Disable request {dr.log_id} by {dr.user_id} on {dr.host_fqdn} at {dr.timestamp.isoformat()} with disable id '{dr.disable_id}' for reason: {dr.reason}")
elif args.action == "update-db":
quman.synchronize_with_grid_engine()
if test_mode:
grid_engine.get_status().save_as_json(queues_status_file_path)
except RuntimeError as e:
sys.stderr.write(f"ERROR: {e}\n")
exit(1)
if __name__ == "__main__":
main()

786
cocluto/sqldb.py Normal file
View File

@ -0,0 +1,786 @@
from typing import Union, List, Optional, Tuple
from pathlib import Path
from enum import Enum
import json
import logging
import re
import os
import abc
import sqlite3
from .mysql2sqlite import mysql_to_sqlite
import subprocess
import traceback
import pexpect
SqlQuery = str
class Table:
'''a table is a list of rows, where each row is a tuple of column values
'''
column_names: Optional[List[str]]
rows: List[Tuple]
def __init__(self, column_names: Optional[List[str]], rows: List[Tuple]):
self.column_names = column_names
self.rows = rows
def get_value(self, row: int, col: Union[int, str]) -> Union[str, int, float]:
'''returns the value at the given row and column in this table
:param row: the index of the row (0-based)
:param col: the index or name of the column
'''
if isinstance(col, str):
assert self.column_names is not None, 'Cannot get value by column name because this table does not have column names'
try:
col_index = self.column_names.index(col)
except ValueError as exc:
raise ValueError(f'Column name "{col}" not found in table columns {self.column_names}') from exc
col = col_index
assert isinstance(col, int), f'Unexpected type for col: expected int but got {type(col)}'
assert 0 <= row < len(self.rows), f'Row index {row} is out of bounds for table with {len(self.rows)} rows'
assert 0 <= col < len(self.rows[row]), f'Column index {col} is out of bounds for table with {len(self.rows[row])} columns'
return self.rows[row][col]
def check_integrity(self):
'''checks that all rows in this table have the same number of columns and that the number of column names (if any) matches the number of columns in the rows
'''
num_cols = None
for row in self.rows:
if num_cols is None:
num_cols = len(row)
else:
assert len(row) == num_cols, f'All rows in the table should have the same number of columns but got a row with {len(row)} columns while previous rows have {num_cols} columns'
if self.column_names is not None:
assert len(self.column_names) == num_cols, f'Number of column names {len(self.column_names)} does not match number of columns in rows {num_cols}'
@property
def num_rows(self) -> int:
return len(self.rows)
@property
def num_cols(self) -> int:
nc = len(self.rows[0]) if len(self.rows) > 0 else 0
if self.column_names is not None:
assert len(self.column_names) == nc, f'Number of column names {len(self.column_names)} does not match number of columns in rows {nc}'
return nc
class SqlTableField():
'''description of a field of a sql table
'''
class Type(Enum):
FIELD_TYPE_STRING = 0
FIELD_TYPE_INT = 1
FIELD_TYPE_FLOAT = 2
FIELD_TYPE_TIME = 3
name: str # the name of the field, eg 'matrix_size'
field_type: Type # the type of the field, eg 'PARAM_TYPE_INT'
description: str # the description of the field, eg 'the size n of the n*n matrix '
is_autoinc_index: bool # indicates if theis field is used as an autoincrement index in the table
def __init__(self, name: str, field_type: Type, description: str, is_autoinc_index=False):
if is_autoinc_index:
assert field_type == SqlTableField.Type.FIELD_TYPE_INT, 'only an integer field can be used as a autoincrement table index'
self.name = name
self.field_type = field_type
self.description = description
self.is_autoinc_index = is_autoinc_index
ColumnId = str # eg 'matrix_size'
TableId = str # eg 'benchmark_results'
class ISqlConnection(object):
def __init__(self):
pass
@abc.abstractmethod
def query(self, sql_query: SqlQuery) -> List[str]:
"""
:param str sql_query: the sql query to perform
"""
raise NotImplementedError()
def query_select(self, columns: List[ColumnId], table: TableId, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
"""
performs a select query on the sql database and returns the results in the form of a list of tuples (one tuple per row, tuple values are the column values)
:param List[ColumnId] columns: the columns to select
:param TableId table: the name of the table to query
:param Optional[str] where_clause: the where clause for the query, eg "matrix_size > 100"
:param Optional[str] join_clause: the join clause for the query, eg "disables ON log.id = disables.disable_request_id"
:param bool distinct: whether to return distinct rows
:return: the results of the query
"""
raise NotImplementedError()
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
"""
performs an insert query on the sql database and returns the id of the inserted row
:param str table_id: the name of the table to insert into
:param List[str] fields: the list of fields to insert values into
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
:return: the id of the inserted row
"""
raise NotImplementedError()
@abc.abstractmethod
def table_exists(self, table_name: str) -> bool:
"""returns true if the given table exists in the database
"""
raise NotImplementedError()
@abc.abstractmethod
def create_table(self, table_name: str, fields: List[SqlTableField]):
"""creates the table in this sql database
"""
raise NotImplementedError()
@abc.abstractmethod
def delete_table(self, table_name: str):
"""deletes the given table in this sql database
"""
raise NotImplementedError()
# @abc.abstractmethod
# def add_table_row(self, table_name: str, values: List[SqlTableField]):
# """creates the table in this sql database
# """
# raise NotImplementedError()
@abc.abstractmethod
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
"""returns the sql directive for the declaration of the given table field (eg "`matrix_size` real NOT NULL COMMENT 'the size of the matrix'")
"""
raise NotImplementedError()
@abc.abstractmethod
def dump(self, sql_file_path: Path):
"""dumps this database into the given sql file"""
raise NotImplementedError()
class ISqlDatabaseBackend(object):
def __init__(self):
pass
@abc.abstractmethod
def connect(self) -> ISqlConnection:
"""returns a connection to this sql database backend that can be used to perform sql queries on the database"""
raise NotImplementedError()
def values_to_sql_string(values: List[tuple]) -> str:
'''converts a list of tuples of values into a string that can be used in a sql query, with proper escaping of string values
eg the list of tuples
[('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')]
will be converted into the following string that can be used in a sql query :
"('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')"
'''
sql_values = []
for value_tuple in values:
escaped_value_tuple = tuple(["'%s'" % str(v).replace("'", r"\'") for v in value_tuple])
sql_values.append(f'({", ".join(escaped_value_tuple)})')
return ', '.join(sql_values)
# class RemoteMysqlDb(ISqlDatabaseBackend):
# def __init__(self, db_server_fqdn, db_user, db_name):
# """
# :param str db_server_fqdn: the fully qualified domain name of the server hosting the database, eg simpatix10.univ-rennes1.fr
# :param str db_user: the user for accessing the inventory database, eg simpadb_reader
# :param str db_name: the name of the inventory database, eg simpadb
# """
# self.db_server_fqdn = db_server_fqdn
# self.db_user = db_user
# self.db_name = db_name
# self._connect()
# def _connect(self):
# self._conn = MySQLdb.connect(self.db_server_fqdn, self.db_user, '', self.db_name)
# assert self._conn
# def query(self, sql_query) -> List[str]:
# """
# :param str sql_query: the sql query to perform
# """
# cursor = self._conn.cursor()
# cursor.execute(sql_query)
# rows = cursor.fetchall()
# logging.debug("RemoteMysqlDb.query:: results of query using cursor '%s': %s", sql_query, rows)
# return rows
# def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
# sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
# if join_clause:
# sql_query += f" JOIN {join_clause}"
# if where_clause:
# sql_query += f" WHERE {where_clause}"
# return self.query(sql_query)
# def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
# sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
# sql_query = sql_query + ";SELECT last_insert_id();"
# logging.debug("RemoteMysqlDb.query_insert:: sql_query = '%s'", sql_query)
# rows = self.query(sql_query)
# last_insert_id = int(rows[0][0])
# assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
# return last_insert_id
# def table_exists(self, table_name: str) -> bool:
# rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
# assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
# return len(rows) == 1
# def create_table(self, table_name: str, fields: List[SqlTableField]):
# raise NotImplementedError()
# def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
# return f'`{field_name}` {field_sql_type} COMMENT \'{field_description}\''
# def dump(self, sql_file_path: Path):
# raise NotImplementedError()
def json_to_table(json_str: str) -> Table:
assert isinstance(json_str, str), f'Expected a string as input for json_to_table but got {type(json_str)}' # eg '[{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}]'
# logging.debug("json_to_table:: json_str = '%s'", json_str)
if json_str == 'NULL':
return Table(column_names=None, rows=[])
try:
json_data = json.loads(json_str)
except json.JSONDecodeError:
logging.error("json_to_table:: invalid json string: '%s'", json_str)
raise
assert isinstance(json_data, list), f'Expected a list of rows in the json string but got {type(json_data)}'
table = []
for row in json_data:
row_values = []
for value in row.values():
assert isinstance(value, (str, int, float)), f'Expected a string, int or float value for each column value in the json string but got {type(value)}'
row_values.append(value)
table.append(tuple(row_values))
return Table(column_names=None, rows=table)
def print_line_chars(line: str):
for char in line:
# char identifier, eg 'ESC' for 0x1b
char_name = {
'\x1b': 'ESC',
'\r': 'CR',
'\n': 'LF'
}.get(char, repr(char))
logging.debug('char = \'%s\' (hex=%s)', char_name, char.encode("utf-8").hex())
def strip_vt100(text):
# ANSI: Standardized, starts with \x1b[.
# VT100: Includes private sequences like \x1b(B, \x1b(A, \x1b>, etc., which are not ANSI but widely supported in terminal emulators.
# Matches all VT100 escape sequences:
# - CSI: \x1b[... (e.g., \x1b[0;1m)
# - Fe: \x1b(..., \x1b), \x1b> (e.g., \x1b(B, \x1b>A)
# - Single-character: \x1b[@-Z\\_-]
vt100_escape = re.compile(r'\x1b(?:[@-Z\\_-]|\[[0-?]*[ -/]*[@-~]|\([A-Za-z]|\)[A-Za-z])')
return vt100_escape.sub('', text)
def parse_mysql_stdout(stdout: str) -> Table:
# example of mysql query output for the query "SHOW TABLES LIKE 'log';":
# > SHOW TABLES LIKE 'log';
# > +-----------------------+
# > | Tables_in_quman (log) |
# > +-----------------------+
# > | log |
# > +-----------------------+
# > 1 row in set (0,001 sec)
# >
# >
# logging.debug("parse_mysql_stdout:: stdout = '%s'", stdout)
table = []
line_index = 0
num_horizontal_separators = 0
num_rows = None
num_empty_lines = 0
column_names = None
for line in stdout.split('\n'):
# print_line_chars(line)
line = strip_vt100(line)
line = line.strip('\r') # remove potential carriage return characters that can be present in the mysql output
# remove all ansi escape codes
# logging.debug("parse_mysql_stdout:: line = '%s' (%d characters, hex=%s)", line, len(line), line.encode('utf-8').hex())
if re.match(r'^\s*\+\-+\+\s*$', line):
num_horizontal_separators += 1
elif re.match(r'^\|.*\|$', line):
# eg '| Tables_in_quman (log) |'
is_header = num_horizontal_separators == 1
if is_header:
column_names = [col.strip() for col in line.split('|')[1:-1]]
else:
assert num_horizontal_separators == 2, f'Unexpected case: a line with values should be between two horizontal separators but got num_horizontal_separators={num_horizontal_separators} for line="{line}"'
values = [line.strip() for line in line.split('|')[1:-1]]
table.append(tuple(values))
elif line.endswith(';'):
# this is the echo of the query that we sent to mysql, we can ignore it but we check that it is indeed the first line of the output
# eg: 'SHOW TABLES LIKE 'log';'
assert line_index == 0, f'Unexpected case: the first line of the mysql output should be the query echo but got "{line}"'
elif re.match(r'^Empty set \([^\)]*\)$', line):
# eg: 'Empty set (0,000 sec)'
num_rows = 0
elif re.match(r'^Query OK', line):
# eg Query OK, 1 row affected (0,005 sec)
pass
else:
m = re.match(r'^\s*(?P<num_rows>\d+)\s+row[s]*\s+in\s+set\s+\(.*\)\s*$', line)
if m:
# eg '1 row in set (0,001 sec)'
num_rows = int(m.group('num_rows'))
elif line.strip() == '':
num_empty_lines += 1
else:
assert False, f'Unexpected line in mysql output: "{line}"'
line_index += 1
if num_rows is None:
num_rows = len(table)
assert num_rows == len(table), f'Unexpected case: the number of rows in the mysql output should be {num_rows} but got {len(table)}.'
if num_rows > 0:
assert num_horizontal_separators == 3, f'Unexpected case: there should be exactly 3 horizontal separators in the mysql output but got {num_horizontal_separators}.'
return Table(column_names=column_names, rows=table)
class SshAccessedMysqlConnection(ISqlConnection):
ssh_mysql_db: 'SshAccessedMysqlDb'
mysql_spawn: Optional[pexpect.spawn] = None
mysql_prompt_re: str
def __init__(self, ssh_mysql_db: 'SshAccessedMysqlDb'):
self.ssh_mysql_db = ssh_mysql_db
self.mysql_spawn = None
self.mysql_prompt_re = r'MariaDB \[[^]]+\]> '
def __enter__(self) -> 'SshAccessedMysqlConnection':
# Set up the connection (e.g., open SSH tunnel, connect to MySQL)
command = f'ssh -t "{self.ssh_mysql_db.ssh_user}@{self.ssh_mysql_db.db_server_fqdn}" "mysql --defaults-group-suffix={self.ssh_mysql_db.db_user}"'
# command = [
# 'ssh',
# '-t',
# f"{self.ssh_mysql_db.ssh_user}@{self.ssh_mysql_db.db_server_fqdn}",
# f'mysql --defaults-group-suffix={self.ssh_mysql_db.db_user}'
# ]
logging.debug("SshAccessedMysqlConnection.__enter__: running ssh command '%s'", command)
try:
self.mysql_spawn = pexpect.spawn(command, encoding='utf-8', timeout=10)
assert isinstance(self.mysql_spawn, pexpect.spawn)
# graffy@alambix50:~$ /usr/bin/ssh -t qumandbw@alambix-master.ipr.univ-rennes.fr 'mysql --defaults-group-suffix=qumandbw'
# Welcome to the MariaDB monitor. Commands end with ; or \g.
# Your MariaDB connection id is 845
# Server version: 10.11.14-MariaDB-0+deb12u2 Debian 12
# Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
# Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
# MariaDB [(none)]>
self.mysql_spawn.expect(self.mysql_prompt_re) # Wait for MySQL prompt
show_databases = False
if show_databases:
self.mysql_spawn.sendline("show databases;")
self.mysql_spawn.expect(self.mysql_prompt_re)
logging.debug("databases:")
logging.debug(self.mysql_spawn.before) # Print output
self.mysql_spawn.sendline(f"use {self.ssh_mysql_db.db_name};")
self.mysql_spawn.expect(self.mysql_prompt_re)
except ValueError as e:
logging.error("failed to execute the command '%s' (error: %s)", command, str(e))
traceback.print_exc()
raise RuntimeError(f"Error while connecting to mysql: {str(e)}") from e
return self # Return the connection object for use in the 'with'
def __exit__(self, exc_type, exc_val, exc_tb):
# Clean up the connection
logging.debug("SshAccessedMysqlConnection.__exit__: Closing connection...")
self.mysql_spawn.sendline("quit")
self.mysql_spawn.expect(pexpect.EOF)
self.mysql_spawn = None
if exc_type:
logging.error("An error occurred: %s", exc_val)
return False # Propagate exceptions
def query(self, sql_query: SqlQuery) -> List[str]:
"""
:param str sql_query: the sql query to perform
"""
try:
self.mysql_spawn.sendline(sql_query)
self.mysql_spawn.expect(self.mysql_prompt_re)
stdout = self.mysql_spawn.before
except pexpect.exceptions.ExceptionPexpect as e:
logging.error("SshAccessedMysqlDb.query:: error while executing query '%s': %s", sql_query, str(e))
raise RuntimeError(f"Error while executing query '{sql_query}': {e}") from e
logging.debug("SshAccessedMysqlDb.query:: results of query '%s': %s", sql_query, stdout)
rows = stdout.split('\n') if stdout != '' else []
# detect errors in the output such as:
# ERROR 1142 (42000): DELETE command denied to user 'qumandbw'@'localhost' for table `quman`.`disables`
for row in rows:
m = re.match(r'^ERROR ', row)
if m is not None:
raise RuntimeError(f'the sql query "{sql_query}" failed with the error : "{row}"')
return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
# MariaDB [quman]> SELECT JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) FROM log;
# +-------------------------------------------------------------------------------------------------------+
# | JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) |
# +-------------------------------------------------------------------------------------------------------+
# | [{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}] |
# +-------------------------------------------------------------------------------------------------------+
# 1 row in set (0,006 sec)
# columns_list_str = ', '.join([f'"{col}"' for col in columns])
columns_list_str = ', '.join([f"'{col}', {col}" for col in columns])
columns_statement = f'JSON_ARRAYAGG(JSON_OBJECT({columns_list_str}))'
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{columns_statement} FROM {table}"
if join_clause:
sql_query += f" JOIN {join_clause}"
if where_clause:
sql_query += f" WHERE {where_clause}"
sql_query += ';'
lines = self.query(sql_query)
table = parse_mysql_stdout('\n'.join(lines))
assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.'
assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.'
json_str = table.get_value(row=0, col=0)
# json_str is expected to contain a string like '[{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}]'
assert isinstance(json_str, str), f'Expected a string as the value of the only cell in the table output but got {type(json_str)}'
# logging.debug("SshAccessedMysqlDb.query_select:: value (in the form of a stringified json tree) of the cell for the query '%s': '%s'", sql_query, json_str)
# match = re.match(r'^\s*(?P<json_data>{[^}]*})\s*$', data_line)
# assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, stdout)
# json_data = json.loads(match.group('json_data'))
t = json_to_table(json_str)
assert isinstance(t, Table), f'Unexpected case: json_to_table should return a Table object but got {type(t)}'
return t
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
logging.debug("SshAccessedMysqlDb.query_insert:: values = %s", values)
# INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('2026-05-26T15:31:53.564287', 'graffy', 'alambix50.ipr.univ-rennes.fr', 'gpuonly.q@alambix104.ipr.univ-rennes1.fr', 'disable', 'quman-sync', 'synchronized with grid engine');SELECT JSON_OBJECT('toto', last_insert_id());
# Query OK, 1 row affected (0,046 sec)
# +---------------------------------------+
# | JSON_OBJECT('toto', last_insert_id()) |
# +---------------------------------------+
# | {"toto": 9} |
# +---------------------------------------+
# 1 row in set (0,002 sec)
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
sql_query = sql_query + ";SELECT JSON_OBJECT('toto', last_insert_id());"
logging.debug("SshAccessedMysqlDb.query_insert:: sql_query = '%s'", sql_query)
stdout_lines = self.query(sql_query)
table: Table = parse_mysql_stdout('\n'.join(stdout_lines))
assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.'
assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.'
data_line = table.get_value(row=0, col=0)
assert isinstance(data_line, str), f'Expected a string as data line in the query output but got {type(data_line)}'
logging.debug("SshAccessedMysqlDb.query_insert:: data line of query '%s': '%s'", sql_query, data_line)
match = re.match(r'^\s*(?P<json_data>{\s*"toto"\s*:\s*(\d+)\s*})\s*$', data_line)
assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, data_line)
json_data = json.loads(match.group('json_data'))
last_insert_id = int(json_data['toto'])
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
return last_insert_id
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
table: Table = parse_mysql_stdout('\n'.join(rows))
logging.debug('len(rows) = %d', len(rows))
logging.debug('rows = %s', str(rows))
assert table.num_rows <= 1, f'Unexpected case: more than one ({len(table)}) tables match the table name {table_name}.'
if table.num_rows > 0:
assert table.column_names[0].startswith('Tables_in_'), f'Unexpected case: the column name in the output of "SHOW TABLES LIKE ..." should start with "Tables_in_" but got "{table.column_names[0]}"'
assert table.get_value(row=0, col=0) == table_name, f'Unexpected case: the table name in the output of "SHOW TABLES LIKE ..." should be "{table_name}" but got "{table.get_value(row=0, col=0)}"'
return table.num_rows == 1
def create_table(self, table_name: str, fields: List[SqlTableField]):
# https://www.sqlite.org/autoinc.html
# > The AUTOINCREMENT keyword imposes extra CPU, memory, disk space, and disk I/O overhead and should be avoided if not strictly needed. It is usually not needed.
fields_sql_descriptions = []
for field in fields:
sql_field_type = {
SqlTableField.Type.FIELD_TYPE_FLOAT: 'real NOT NULL',
SqlTableField.Type.FIELD_TYPE_INT: 'int(11) NOT NULL',
SqlTableField.Type.FIELD_TYPE_STRING: 'varchar(256) NOT NULL',
SqlTableField.Type.FIELD_TYPE_TIME: 'datetime NOT NULL',
}[field.field_type]
if field.is_autoinc_index:
assert field.field_type == SqlTableField.Type.FIELD_TYPE_INT
sql_field_type = 'INTEGER PRIMARY KEY AUTO_INCREMENT'
fields_sql_description = self.get_field_directive(field.name, sql_field_type, field.description)
fields_sql_descriptions.append(fields_sql_description)
sql_create_table_command = f'CREATE TABLE `{table_name}` ({",".join(fields_sql_descriptions)});'
logging.debug('sql_create_table_command = %s', sql_create_table_command)
self.query(sql_create_table_command)
def delete_table(self, table_name: str):
sql_create_table_command = f'DROP TABLE `{table_name}`;'
self.query(sql_create_table_command)
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
return f'`{field_name}` {field_sql_type} COMMENT \'{field_description}\''
def dump(self, sql_file_path: Path):
# mysqldump -u root --quote-names --opt --single-transaction --quick $db >
db = self.ssh_mysql_db
command = f'ssh "{db.ssh_user}@{db.db_server_fqdn}" "mysqldump --defaults-group-suffix={db.db_user}" {db.db_name} > {sql_file_path}'
_ = subprocess.run(command, shell=True, check=False, capture_output=True)
class SshAccessedMysqlDb(ISqlDatabaseBackend):
"""a mysql database server accessed using ssh instead of a remote mysql client
Instead of accessing the remote sql database from mysql client, this method ssh connects (we expect an unattended connection setup using ssh keys) to the server hosting the database, and once logged in performs a request to the database locally using mysql client running on the server hosting the database.
This method has a the following benefits over the simple use of mysql:
- it's more secure since it benefits from ssh encryption
- it doesn't require extra open tcp ports for remote mysql database access
"""
db_server_fqdn: str # the fully qualified domain name of the server hosting the database, eg iprbenchdb.ipr.univ-rennes1.fr
db_user: str # the user for accessing the inventory database, eg iprbenchw
db_name: str # the name of the database, eg iprbench
ssh_user: str # the user on db_server_fqdn that has access to the database db_name
def __init__(self, db_server_fqdn: str, db_user: str, db_name: str, ssh_user: str):
"""
:param str db_server_fqdn: the fully qualified domain name of the server hosting the database, eg iprbenchdb.ipr.univ-rennes1.fr
:param str db_user: the user for accessing the inventory database, eg iprbenchw
:param str db_name: the name of the database, eg iprbench
:param str ssh_user: the user on db_server_fqdn that has access to the database db_name
"""
self.db_server_fqdn = db_server_fqdn
self.db_user = db_user
self.db_name = db_name
self.ssh_user = ssh_user
def connect(self) -> SshAccessedMysqlConnection:
return SshAccessedMysqlConnection(self)
class SqliteConnection(ISqlConnection):
sqlite_db: 'SqliteDb'
_con: sqlite3.Connection
_cur: sqlite3.Cursor
def __init__(self, sql_db: 'SqliteDb'):
self.sqlite_db = sql_db
self._con = None
self._cur = None
def __enter__(self) -> 'SqliteConnection':
# Set up the connection (e.g., open SSH tunnel, connect to MySQL)
logging.debug("Opening connection...")
check_same_thread = False
# this is to prevent the following error when run from apache/django : SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 139672342353664 and this is thread id 139672333960960
# accordig to https://stackoverflow.com/questions/48218065/programmingerror-sqlite-objects-created-in-a-thread-can-only-be-used-in-that-sa this is ok, as long as there are no concurrent writes
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
sqlite_db_path = self.sqlite_db.sqlite_db_path
if sqlite_db_path == ':memory:' or not sqlite_db_path.exists():
logging.debug('creating sqlite database in %s', sqlite_db_path)
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
else:
logging.debug('reusing existing sqlite database in %s', sqlite_db_path)
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
self._cur = self._con.cursor()
logging.debug('self._con = %s', self._con)
logging.debug('self._cur = %s', self._cur)
_ = self.query('PRAGMA encoding="UTF-8";')
return self # Return the connection object for use in the 'with'
def __exit__(self, exc_type, exc_val, exc_tb):
# Clean up the connection
logging.debug("Closing connection...")
if exc_type:
logging.error("An error occurred: %s", exc_val)
return False # Propagate exceptions
def query(self, sql_query) -> List[str]:
"""
:param str sql_query: the sql query to perform
"""
self._cur.execute(sql_query)
rows = self._cur.fetchall()
self._con.commit()
return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
if join_clause:
sql_query += f" JOIN {join_clause}"
if where_clause:
sql_query += f" WHERE {where_clause}"
rows = self.query(sql_query)
return Table(column_names=columns, rows=rows)
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
"""
performs an insert query on the sql database and returns the id of the inserted row
:param str table_id: the name of the table to insert into
:param List[str] fields: the list of fields to insert values into
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
:return: the id of the inserted row
"""
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
logging.debug("SqliteDb.query_insert:: sql_query = '%s'", sql_query)
logging.debug("SqliteDb.query_insert:: values = %s", values)
self._cur.execute(sql_query)
self._cur.execute('SELECT last_insert_rowid();')
rows = self._cur.fetchall()
self._con.commit()
assert len(rows) == 1, f'Unexpected number of rows ({len(rows)}).'
last_insert_id = int(rows[0][0])
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
return last_insert_id
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
return len(rows) == 1
def create_table(self, table_name: str, fields: List[SqlTableField]):
# https://www.sqlite.org/autoinc.html
# > The AUTOINCREMENT keyword imposes extra CPU, memory, disk space, and disk I/O overhead and should be avoided if not strictly needed. It is usually not needed.
fields_sql_descriptions = []
for field in fields:
sql_field_type = {
SqlTableField.Type.FIELD_TYPE_FLOAT: 'real NOT NULL',
SqlTableField.Type.FIELD_TYPE_INT: 'int(11) NOT NULL',
SqlTableField.Type.FIELD_TYPE_STRING: 'varchar(256) NOT NULL',
SqlTableField.Type.FIELD_TYPE_TIME: 'datetime NOT NULL',
}[field.field_type]
if field.is_autoinc_index:
assert field.field_type == SqlTableField.Type.FIELD_TYPE_INT
sql_field_type = 'INTEGER PRIMARY KEY'
fields_sql_description = self.get_field_directive(field.name, sql_field_type, field.description)
fields_sql_descriptions.append(fields_sql_description)
sql_create_table_command = f'CREATE TABLE `{table_name}` ({",".join(fields_sql_descriptions)});'
logging.debug('sql_create_table_command = %s', sql_create_table_command)
self.query(sql_create_table_command)
def delete_table(self, table_name: str):
sql_create_table_command = f'DROP TABLE `{table_name}`;'
self.query(sql_create_table_command)
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
# sqlite doesn't understand the COMMENT keyword, so we use sql comments ( "--" ), as explained in [https://stackoverflow.com/questions/7426205/sqlite-adding-comment-on-descriptions-to-tables-and-columns]
return f'`{field_name}` {field_sql_type} -- {field_description}\n'
def dump(self, sql_file_path: Path):
with open(sql_file_path, 'wt', encoding='utf8') as f:
for line in self._con.iterdump():
f.write(line)
class SqliteDb(ISqlDatabaseBackend):
sqlite_db_path: Union[Path, str] # the path of the sqlite database. ':memory:' # sqlite-specific special name for a file stored in memory. We could use something like '/tmp/simpadb.sqlite' here but this would make parsing really slow (1 minute instead of 1s), unless either :
def __init__(self, sqlite_db_path: Path):
self.sqlite_db_path = sqlite_db_path
def connect(self) -> SqliteConnection:
return SqliteConnection(self)
class SqlFile(SqliteDb):
_sql_file_path: Path
def __init__(self, sql_file_path: Path, truncate_hex_strings=False):
"""
:param str sql_file_path: the path of the sql file containing the inventory database
"""
self._sql_file_path = sql_file_path
# _con: sqlite3.Connection = None
# _cur: sqlite3.Cursor = None
sqlite_db_path = ':memory:' # sqlite-specific special name for a file stored in memory. We could use something like '/tmp/simpadb.sqlite' here but this would make parsing really slow (1 minute instead of 1s), unless either :
# - proper fix : group of INSERT statements are surrounded by BEGIN and COMMIT (see http://stackoverflow.com/questions/4719836/python-and-sqlite3-adding-thousands-of-rows)
# - the file is stored on a solid state disk
try:
os.remove(sqlite_db_path)
except BaseException: # pylint: disable=broad-except
pass
super().__init__(sqlite_db_path)
with open(str(self._sql_file_path), 'r', encoding='utf8') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path
sql = f.read() # watch out for built-in `str`
# print(sql)
sqlite_sql = mysql_to_sqlite(sql, truncate_hex_strings)
# print(mysql_to_sqlite(sql))
# with open('/tmp/toto.sqlite.sql', 'w') as f:
# f.write(sqlite_sql)
# with open('/tmp/toto.sqlite.sql', 'r') as f:
# sqlite_sql = f.read()
with self.connect() as conn:
conn._cur.executescript(sqlite_sql)
class TableAttrNotFound(Exception):
def __init__(self, table, key_name, key_value, attr_name):
message = "failed to find in table %s a value for %s where %s is %s" % (table, attr_name, key_name, key_value)
super(TableAttrNotFound, self).__init__(message)
self.table = table
self.key_name = key_name
self.key_value = key_value
self.attr_name = attr_name
class SqlDatabaseReader(object):
def __init__(self, inv_provider: ISqlDatabaseBackend):
"""
:param ISqlDatabaseBackend inv_provider: the input that provides the inventory data
"""
self._inv_provider = inv_provider
def query(self, sql_query: SqlQuery):
"""
performs a query on the sql database
:param SqlQuery sql_query: the sql query to perform
"""
return self._inv_provider.query(sql_query)
def get_table_attr(self, table, key_name, key_value, attr_name):
"""
reads the value of the fiven attribute of the given item in the given table
:param str table: the name of the table to read
:param str key_name: the name of the column that stores the id of the item to read
:param str key_value: the id of the item to read
:param str attr_name: the name of the attribute to read from the item
"""
attr_value = None
rows = self.query("SELECT " + attr_name + " FROM " + table + " WHERE " + key_name + "='" + key_value + "'")
if len(rows) > 0:
attr_value = rows[0][0]
else:
raise TableAttrNotFound(table, key_name, key_value, attr_name)
return attr_value

View File

@ -1,3 +1,6 @@
__version__ = '1.0.32'
class Version(object):
"""
simple version number made of a series of positive integers separated by dots

39
pyproject.toml Normal file
View File

@ -0,0 +1,39 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "cocluto"
dynamic = ["version"] # the list of fields whose values are dicovered by the backend (eg __version__)
description = "compute cluster utility tools"
readme = "README.md"
keywords = ["sql", "hpc", "pdu", "power supply", "inventory", "son of grid engine"]
license = {text = "MIT License"}
dependencies = [
"pygraphviz", # requires apt install graphviz-dev
"mysqlclient",
"pexpect"
]
requires-python = ">= 3.8"
authors = [
{name = "Guillaume Raffy", email = "guillaume.raffy@univ-rennes.fr"}
]
[project.scripts]
quman = "cocluto.quman:main"
[project.urls]
Repository = "https://git.ipr.univ-rennes.fr/cellinfo/cocluto"
[tool.setuptools]
packages = [
"cocluto",
"cocluto.ClusterController",
"cocluto.Ipmi",
"cocluto.SunGridEngine",]
[tool.setuptools.dynamic]
version = {attr = "cocluto.version.__version__"}
[tool.setuptools.package-data]
iprbench = ["resources/**/*"]

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='cocluto',
version=1.06,
version='1.0.9',
description='compute cluster utility tools',
url='https://git.ipr.univ-rennes1.fr/graffy/cocluto',
author='Guillaume Raffy',

63
test/test_quman.py Normal file
View File

@ -0,0 +1,63 @@
from pathlib import Path
import json
import unittest
import logging
# from cocluto import ClusterController
from cocluto.sqldb import SqliteDb
from cocluto.quman import QueueManager, init_db, MockGridEngine, QueuesStatus
class QumanTestCase(unittest.TestCase):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setUp(self) -> None: # pylint: disable=useless-super-delegation
return super().setUp()
def test_quman(self):
logging.info('test_quman')
db_path = Path('./quman_test/quman.sqlite')
db_path.parent.mkdir(exist_ok=True)
if db_path.exists():
db_path.unlink()
db_backend = SqliteDb(db_path)
init_db(db_backend)
qs = QueuesStatus()
for node_id in range(40, 44):
qs.add_queue(f'main.q@alambix{node_id}', True)
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
grid_engine.disable_queue_machine('main.q@alambix42') # simulate that the queue is already disabled)
with db_backend.connect() as sql_conn:
quman = QueueManager(sql_conn, grid_engine)
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state().as_dict(), indent=2))
print('synchronizing with grid engine...')
quman.synchronize_with_grid_engine()
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state().as_dict(), indent=2))
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
with self.assertRaises(RuntimeError):
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'sysadmin.graffy', 'because I want to test quman')
quman.request_queue_machines_deactivation(['main.q@alambix42'], 'croconaus.maco-update', 'disabled to update maco')
quman.request_queue_machines_activation(['main.q@alambix42'], 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
main_queue_machines = quman.get_queue_machines('main.q')
quman.request_queue_machines_deactivation(main_queue_machines, 'sysadmin.graffy.bug4242', 'disable all cluster to prepare complete shutdown')
main_queue_machines = quman.get_queue_machines('main.q')
quman.request_queue_machines_activation(main_queue_machines, 'sysadmin.graffy.bug4242', 'electricity is back, reactivating all cluster')
quman.synchronize_with_grid_engine()
sql_conn.dump(Path('./quman_test/quman_dump.sql'))
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
print(json.dumps(quman.get_state().as_dict(), indent=2))
if __name__ == '__main__':
unittest.main()

114
test/test_sqldb.py Normal file
View File

@ -0,0 +1,114 @@
from typing import Any
import unittest
import logging
from pathlib import Path
from cocluto.sqldb import parse_mysql_stdout, ISqlDatabaseBackend, SqliteDb, SshAccessedMysqlDb, SqlTableField
def stringify(value: Any):
return f"'{str(value)}'"
def test_sql_backend(sql_backend: ISqlDatabaseBackend):
table_name = 'colutotestmamul1'
fields = [
SqlTableField('measure_id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the measurement', is_autoinc_index=True),
SqlTableField('measurement_time', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this measurment has been made'),
SqlTableField('cpu_model', SqlTableField.Type.FIELD_TYPE_STRING, 'The exact model of the cpu running the benchmark eg "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"'),
SqlTableField('duration', SqlTableField.Type.FIELD_TYPE_FLOAT, 'the duration of the benchmark (in seconds)'),
]
with sql_backend.connect() as sql_conn:
if sql_conn.table_exists(table_name):
sql_conn.delete_table(table_name)
sql_conn.create_table(table_name, fields)
# test ISqlConnection.query method
measurements = [
{
'measurement_time': '1951-12-13 12:34:50',
'cpu_model': 'Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz',
'duration': 0.42
},
{
'measurement_time': '1985-10-26 09:00:00',
'cpu_model': 'Intel(R) Core(TM) i3-1234U CPU @ 1.42GHz',
'duration': 3.14
}
]
for measurement in measurements:
sql_query = f'insert into {table_name}({", ".join([field.name for field in fields if not field.is_autoinc_index])}) values({", ".join([stringify(measurement[field.name]) for field in fields if not field.is_autoinc_index])});'
logging.debug('sql_query = %s', sql_query)
sql_conn.query(sql_query)
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name)
assert table.num_rows == 2, f'Unexpected case: the number of rows in the result of the query should be 2 but got {table.num_rows}.'
# test ISqlConnection.query_insert method
measurements2 = [
{
'measurement_time': '1955-11-12 06:38:00',
'cpu_model': 'Intel(R) Core(TM) i7-6666U CPU @ 2.50GHz',
'duration': 2.71
},
]
expected_measurement_index = 3 # because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index
for measurement in measurements2:
fields = [field.name for field in fields if not field.is_autoinc_index]
values = tuple(measurement[field] for field in fields)
measurement_index = sql_conn.query_insert(table_id=table_name, fields=fields, values=[values])
assert measurement_index == expected_measurement_index, f'Unexpected case: the index of the inserted measurement should be {expected_measurement_index} (because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index) but got {measurement_index}.'
expected_measurement_index += 1
# test ISqlConnection.query_select method
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name, where_clause='cpu_model = "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"')
assert table.num_rows == 1, f'Unexpected case: the number of rows in the result of the query should be 1 but got {table.num_rows}.'
assert table.rows == [('1951-12-13 12:34:50', 0.42)], f'Unexpected case: the rows in the result of the query should be [("1951+12-13 12:34:50", 0.42)] but got {table.rows}.'
# test ISqlConnection.dump method
sql_conn.dump(Path('/tmp/toto.sql'))
sql_conn.delete_table(table_name)
class SimpadbTestCase(unittest.TestCase):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setUp(self) -> None: # pylint: disable=useless-super-delegation
return super().setUp()
def test_mysql_output_parser(self):
mysql_output = '''SHOW TABLES LIKE 'log';\n\
+-----------------------+\n\
| Tables_in_quman (log) |\n\
+-----------------------+\n\
| log |\n\
+-----------------------+\n\
1 row in set (0,001 sec)\n\
\n\
\n\
'''
table = parse_mysql_stdout(mysql_output)
self.assertEqual(table.column_names, ['Tables_in_quman (log)'])
self.assertEqual(table.rows, [('log',)])
def test_sqlite_backend(self):
logging.info('test_sqlite_backend')
backend = SqliteDb(Path('/tmp/toto.sqlite'))
test_sql_backend(backend)
# self.assertIsInstance(job_state, JobsState)
def test_ssh_accessed_mysql_backend(self):
logging.info('test_ssh_accessed_mysql_backend')
db_server_fqdn = 'iprbenchsdb.ipr.univ-rennes1.fr'
db_user = 'test_iprbenchw'
db_name = 'test_iprbenchs'
ssh_user = 'test_iprbenchw'
backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
test_sql_backend(backend)
if __name__ == '__main__':
unittest.main()