cocluto v1.0.30 - rewrote SimpaDbUtil.ISqlConnection as sqldb.ISqlConnection for performance reasons

- sqldb.ISqlConnection provides the same functionality as SimpaDbUtil.ISqlConnection, with much greater performance thanks to python contexts. However the interface is not compatible, so codes wanting to switch from SimpaDbUtil.ISqlConnection to sqldb.ISqlConnection need adjusting.
- migrated quman to use sqldb.ISqlConnection (as a result the sync now takes 3seconds instead of 60, because of the drastic reductin of calls to mysql and ssh)
- SimpaDbUtil.ISqlConnection will be deleted once every code using SimpaDbUtil.ISqlConnection has been migrated to sqldb.ISqlConnection

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-05-29 20:11:28 +02:00
parent a7f65a9e66
commit d9e2199d2e
7 changed files with 1016 additions and 105 deletions

View File

@ -35,6 +35,15 @@ pipeline {
"""
}
}
stage('testing sqldb') {
steps {
sh """#!/bin/bash
set -o errexit
source ${VENV_PATH}/bin/activate &&
python3 -m unittest test.test_sqldb
"""
}
}
stage('testing quman') {
steps {
sh """#!/bin/bash

View File

@ -9,7 +9,7 @@ import re
import json
from datetime import datetime
from pathlib import Path
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField, SshAccessedMysqlDb
from cocluto.sqldb import ISqlDatabaseBackend, ISqlConnection, SqliteDb, SqlTableField, SshAccessedMysqlDb, Table
from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
@ -135,8 +135,9 @@ class Sge(IGridEngine):
def init_db(db_backend: ISqlDatabaseBackend):
with db_backend.connect() as conn:
# a table storing the log of actions (queue activation or deactivation)
if not db_backend.table_exists('log'):
if not conn.table_exists('log'):
fields = [
SqlTableField('id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the modification', is_autoinc_index=True),
SqlTableField('timestamp', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this modification has been made'),
@ -147,22 +148,22 @@ def init_db(db_backend: ISqlDatabaseBackend):
SqlTableField('disable_id', SqlTableField.Type.FIELD_TYPE_STRING, 'the tag of the disable request that led to this modification (e.g., auto.croconaus, manual.graffy, etc.)'),
SqlTableField('reason', SqlTableField.Type.FIELD_TYPE_STRING, 'the reason for the modification'),
]
db_backend.create_table('log', fields)
conn.create_table('log', fields)
# a table storing the current disable requests
if not db_backend.table_exists('disables'):
if not conn.table_exists('disables'):
fields = [
SqlTableField('disable_request_id', SqlTableField.Type.FIELD_TYPE_INT, 'log.id of the disable action that led to this state'),
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue machine that was disabled'),
]
db_backend.create_table('disables', fields)
conn.create_table('disables', fields)
# a table storing the current queues
if not db_backend.table_exists('queues'):
if not conn.table_exists('queues'):
fields = [
SqlTableField('queue_machine', SqlTableField.Type.FIELD_TYPE_STRING, 'the name of the queue'),
]
db_backend.create_table('queues', fields)
conn.create_table('queues', fields)
def create_db_backend(db_def: str) -> ISqlDatabaseBackend:
@ -241,11 +242,12 @@ class QueueDisableState:
class QueueManager:
db_backend: ISqlDatabaseBackend
sql_conn: ISqlConnection
grid_engine: IGridEngine
def __init__(self, db_backend: ISqlDatabaseBackend, grid_engine: IGridEngine = None):
self.db_backend = db_backend
def __init__(self, sql_conn: ISqlConnection, grid_engine: IGridEngine = None):
assert isinstance(sql_conn, ISqlConnection)
self.sql_conn = sql_conn
if grid_engine is None:
grid_engine = Sge()
self.grid_engine = grid_engine
@ -256,13 +258,14 @@ class QueueManager:
userid = subprocess.check_output(['whoami']).decode().strip()
host_fqdn = subprocess.check_output(['hostname', '-f']).decode().strip()
timestamp = datetime.now().isoformat()
log_id = self.db_backend.query_insert(table_id="log", fields=['timestamp', 'user_id', 'host_fqdn', 'queue_machines', 'action', 'disable_id', 'reason'], values=[(timestamp, userid, host_fqdn, ','.join(queue_machines), action, disable_id, reason)])
log_id = self.sql_conn.query_insert(table_id="log", fields=['timestamp', 'user_id', 'host_fqdn', 'queue_machines', 'action', 'disable_id', 'reason'], values=[(timestamp, userid, host_fqdn, ','.join(queue_machines), action, disable_id, reason)])
return log_id
def get_disable_requests(self, queue_machine: QueueMachineId) -> Dict[int, DisableRequest]:
results = self.db_backend.query_select(['log.id', 'log.user_id', 'log.host_fqdn', 'log.queue_machines', 'log.reason', 'log.disable_id', 'log.timestamp'], join_clause="disables ON log.id = disables.disable_request_id", table="log", where_clause=f"disables.queue_machine = '{queue_machine}' AND log.action = 'disable'")
results_table = self.sql_conn.query_select(['log.id', 'log.user_id', 'log.host_fqdn', 'log.queue_machines', 'log.reason', 'log.disable_id', 'log.timestamp'], join_clause="disables ON log.id = disables.disable_request_id", table="log", where_clause=f"disables.queue_machine = '{queue_machine}' AND log.action = 'disable'")
assert isinstance(results_table, Table), f"Expected results_table to be of type Table but got {type(results_table)}"
disable_requests = []
for row in results:
for row in results_table.rows:
log_id = row[0]
user_id = row[1]
host_fqdn = row[2]
@ -293,7 +296,7 @@ class QueueManager:
disable_log_id = self.log_modification(queue_machines, "disable", disable_id, reason)
for queue_machine in queue_machines:
self.db_backend.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
self.sql_conn.query(f"INSERT INTO disables (disable_request_id, queue_machine) VALUES ({disable_log_id}, '{queue_machine}');")
def request_queue_machines_activation(self, queue_machines: List[QueueMachineId], disable_id: DisableId, reason: str, perform_enable: bool = True):
for queue_machine in queue_machines:
@ -311,18 +314,18 @@ class QueueManager:
if len(disable_requests) == 1:
# queue is currently disabled and there is only one disable reason, we can enable it
self.grid_engine.enable_queue_machine(queue_machine)
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason) # noqa: F841 pylint:disable=unused-variable
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason, ) # noqa: F841 pylint:disable=unused-variable
for queue_machine in queue_machines:
self.db_backend.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
self.sql_conn.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
def synchronize_with_grid_engine(self):
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
qs = self.grid_engine.get_status()
db_queues = set()
results = self.db_backend.query_select(columns=["queue_machine"], table="queues")
logging.debug("synchronize_with_grid_engine: results of query': %s", results)
for row in results:
results_table = self.sql_conn.query_select(columns=["queue_machine"], table="queues")
logging.debug("synchronize_with_grid_engine: results of query': %s", results_table)
for row in results_table.rows:
assert len(row) == 1, "Each row should have only one column (queue_machine) but got row='%s' (len=%d)" % (str(row), len(row))
db_queues.add(row[0])
@ -330,7 +333,7 @@ class QueueManager:
if queue_machine not in db_queues:
# if the queue is not in the database, we add it
logging.warning("Queue %s is not in the database, adding it.", queue_machine)
self.db_backend.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
self.sql_conn.query(f"INSERT INTO queues (queue_machine) VALUES ('{queue_machine}');")
else:
db_queues.remove(queue_machine) # we remove it from the set of queues in the database to keep track of the queues that are in the database but not in the grid engine
disable_requests = self.get_disable_requests(queue_machine)
@ -345,16 +348,16 @@ class QueueManager:
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
for queue_machine in db_queues:
logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine)
self.db_backend.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
self.db_backend.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
self.sql_conn.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
self.sql_conn.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
def get_state(self) -> QueueDisableState:
"""returns the state of the queues."""
# get the list of queue names from the disables table in the database
results = self.db_backend.query_select(columns=["queue_machine"], table="disables", distinct=True)
for row in results:
table = self.sql_conn.query_select(columns=["queue_machine"], table="disables", distinct=True)
for row in table.rows:
assert len(row) == 1, "Each row should have only one column (queue_machine)"
queue_machines = [row[0] for row in results]
queue_machines = [row[0] for row in table.rows]
state = QueueDisableState()
for queue_machine in queue_machines:
@ -438,11 +441,11 @@ def main():
test_mode = args.test
if test_mode:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning("Running in test mode with MockGridEngine. No actual queue will be enabled or disabled.")
else:
logging.basicConfig(level=logging.WARNING)
logging.basicConfig(level=logging.INFO)
if test_mode:
queues_status_file_path = Path('./quman_test/queues_status.json')
@ -459,7 +462,10 @@ def main():
else:
grid_engine = Sge()
db_backend = create_db_backend(args.db_def)
quman = QueueManager(db_backend, grid_engine)
with db_backend.connect() as conn:
assert isinstance(conn, ISqlConnection), f"Expected db_backend.connect() to return an ISqlConnection but got {type(conn)}"
logging.debug("Connected to database successfully with connection %s", conn)
quman = QueueManager(conn, grid_engine)
quman.synchronize_with_grid_engine()

779
cocluto/sqldb.py Normal file
View File

@ -0,0 +1,779 @@
from typing import Union, List, Optional, Tuple
from pathlib import Path
from enum import Enum
import json
import logging
import re
import os
import abc
import sqlite3
from .mysql2sqlite import mysql_to_sqlite
import subprocess
import traceback
import pexpect
SqlQuery = str
class Table:
'''a table is a list of rows, where each row is a tuple of column values
'''
column_names: Optional[List[str]]
rows: List[Tuple]
def __init__(self, column_names: Optional[List[str]], rows: List[Tuple]):
self.column_names = column_names
self.rows = rows
def get_value(self, row: int, col: Union[int, str]) -> Union[str, int, float]:
'''returns the value at the given row and column in this table
:param row: the index of the row (0-based)
:param col: the index or name of the column
'''
if isinstance(col, str):
assert self.column_names is not None, 'Cannot get value by column name because this table does not have column names'
try:
col_index = self.column_names.index(col)
except ValueError as exc:
raise ValueError(f'Column name "{col}" not found in table columns {self.column_names}') from exc
col = col_index
assert isinstance(col, int), f'Unexpected type for col: expected int but got {type(col)}'
assert 0 <= row < len(self.rows), f'Row index {row} is out of bounds for table with {len(self.rows)} rows'
assert 0 <= col < len(self.rows[row]), f'Column index {col} is out of bounds for table with {len(self.rows[row])} columns'
return self.rows[row][col]
def check_integrity(self):
'''checks that all rows in this table have the same number of columns and that the number of column names (if any) matches the number of columns in the rows
'''
num_cols = None
for row in self.rows:
if num_cols is None:
num_cols = len(row)
else:
assert len(row) == num_cols, f'All rows in the table should have the same number of columns but got a row with {len(row)} columns while previous rows have {num_cols} columns'
if self.column_names is not None:
assert len(self.column_names) == num_cols, f'Number of column names {len(self.column_names)} does not match number of columns in rows {num_cols}'
@property
def num_rows(self) -> int:
return len(self.rows)
@property
def num_cols(self) -> int:
nc = len(self.rows[0]) if len(self.rows) > 0 else 0
if self.column_names is not None:
assert len(self.column_names) == nc, f'Number of column names {len(self.column_names)} does not match number of columns in rows {nc}'
return nc
class SqlTableField():
'''description of a field of a sql table
'''
class Type(Enum):
FIELD_TYPE_STRING = 0
FIELD_TYPE_INT = 1
FIELD_TYPE_FLOAT = 2
FIELD_TYPE_TIME = 3
name: str # the name of the field, eg 'matrix_size'
field_type: Type # the type of the field, eg 'PARAM_TYPE_INT'
description: str # the description of the field, eg 'the size n of the n*n matrix '
is_autoinc_index: bool # indicates if theis field is used as an autoincrement index in the table
def __init__(self, name: str, field_type: Type, description: str, is_autoinc_index=False):
if is_autoinc_index:
assert field_type == SqlTableField.Type.FIELD_TYPE_INT, 'only an integer field can be used as a autoincrement table index'
self.name = name
self.field_type = field_type
self.description = description
self.is_autoinc_index = is_autoinc_index
ColumnId = str # eg 'matrix_size'
TableId = str # eg 'benchmark_results'
class ISqlConnection(object):
def __init__(self):
pass
@abc.abstractmethod
def query(self, sql_query: SqlQuery) -> List[str]:
"""
:param str sql_query: the sql query to perform
"""
raise NotImplementedError()
def query_select(self, columns: List[ColumnId], table: TableId, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
"""
performs a select query on the sql database and returns the results in the form of a list of tuples (one tuple per row, tuple values are the column values)
:param List[ColumnId] columns: the columns to select
:param TableId table: the name of the table to query
:param Optional[str] where_clause: the where clause for the query, eg "matrix_size > 100"
:param Optional[str] join_clause: the join clause for the query, eg "disables ON log.id = disables.disable_request_id"
:param bool distinct: whether to return distinct rows
:return: the results of the query
"""
raise NotImplementedError()
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
"""
performs an insert query on the sql database and returns the id of the inserted row
:param str table_id: the name of the table to insert into
:param List[str] fields: the list of fields to insert values into
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
:return: the id of the inserted row
"""
raise NotImplementedError()
@abc.abstractmethod
def table_exists(self, table_name: str) -> bool:
"""returns true if the given table exists in the database
"""
raise NotImplementedError()
@abc.abstractmethod
def create_table(self, table_name: str, fields: List[SqlTableField]):
"""creates the table in this sql database
"""
raise NotImplementedError()
@abc.abstractmethod
def delete_table(self, table_name: str):
"""deletes the given table in this sql database
"""
raise NotImplementedError()
# @abc.abstractmethod
# def add_table_row(self, table_name: str, values: List[SqlTableField]):
# """creates the table in this sql database
# """
# raise NotImplementedError()
@abc.abstractmethod
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
"""returns the sql directive for the declaration of the given table field (eg "`matrix_size` real NOT NULL COMMENT 'the size of the matrix'")
"""
raise NotImplementedError()
@abc.abstractmethod
def dump(self, sql_file_path: Path):
"""dumps this database into the given sql file"""
raise NotImplementedError()
class ISqlDatabaseBackend(object):
def __init__(self):
pass
@abc.abstractmethod
def connect(self) -> ISqlConnection:
"""returns a connection to this sql database backend that can be used to perform sql queries on the database"""
raise NotImplementedError()
def values_to_sql_string(values: List[tuple]) -> str:
'''converts a list of tuples of values into a string that can be used in a sql query, with proper escaping of string values
eg the list of tuples
[('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')]
will be converted into the following string that can be used in a sql query :
"('alambix42', 'disabled to move the alambix42 to another rack'), ('alambix42', 'because I want to test quman')"
'''
sql_values = []
for value_tuple in values:
escaped_value_tuple = tuple(["'%s'" % str(v).replace("'", r"\'") for v in value_tuple])
sql_values.append(f'({", ".join(escaped_value_tuple)})')
return ', '.join(sql_values)
# class RemoteMysqlDb(ISqlDatabaseBackend):
# def __init__(self, db_server_fqdn, db_user, db_name):
# """
# :param str db_server_fqdn: the fully qualified domain name of the server hosting the database, eg simpatix10.univ-rennes1.fr
# :param str db_user: the user for accessing the inventory database, eg simpadb_reader
# :param str db_name: the name of the inventory database, eg simpadb
# """
# self.db_server_fqdn = db_server_fqdn
# self.db_user = db_user
# self.db_name = db_name
# self._connect()
# def _connect(self):
# self._conn = MySQLdb.connect(self.db_server_fqdn, self.db_user, '', self.db_name)
# assert self._conn
# def query(self, sql_query) -> List[str]:
# """
# :param str sql_query: the sql query to perform
# """
# cursor = self._conn.cursor()
# cursor.execute(sql_query)
# rows = cursor.fetchall()
# logging.debug("RemoteMysqlDb.query:: results of query using cursor '%s': %s", sql_query, rows)
# return rows
# def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
# sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
# if join_clause:
# sql_query += f" JOIN {join_clause}"
# if where_clause:
# sql_query += f" WHERE {where_clause}"
# return self.query(sql_query)
# def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
# sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
# sql_query = sql_query + ";SELECT last_insert_id();"
# logging.debug("RemoteMysqlDb.query_insert:: sql_query = '%s'", sql_query)
# rows = self.query(sql_query)
# last_insert_id = int(rows[0][0])
# assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
# return last_insert_id
# def table_exists(self, table_name: str) -> bool:
# rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
# assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
# return len(rows) == 1
# def create_table(self, table_name: str, fields: List[SqlTableField]):
# raise NotImplementedError()
# def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
# return f'`{field_name}` {field_sql_type} COMMENT \'{field_description}\''
# def dump(self, sql_file_path: Path):
# raise NotImplementedError()
def json_to_table(json_str: str) -> Table:
assert isinstance(json_str, str), f'Expected a string as input for json_to_table but got {type(json_str)}' # eg '[{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}]'
# logging.debug("json_to_table:: json_str = '%s'", json_str)
if json_str == 'NULL':
return Table(column_names=None, rows=[])
try:
json_data = json.loads(json_str)
except json.JSONDecodeError:
logging.error("json_to_table:: invalid json string: '%s'", json_str)
raise
assert isinstance(json_data, list), f'Expected a list of rows in the json string but got {type(json_data)}'
table = []
for row in json_data:
row_values = []
for value in row.values():
assert isinstance(value, (str, int, float)), f'Expected a string, int or float value for each column value in the json string but got {type(value)}'
row_values.append(value)
table.append(tuple(row_values))
return Table(column_names=None, rows=table)
def print_line_chars(line: str):
for char in line:
# char identifier, eg 'ESC' for 0x1b
char_name = {
'\x1b': 'ESC',
'\r': 'CR',
'\n': 'LF'
}.get(char, repr(char))
logging.debug('char = \'%s\' (hex=%s)', char_name, char.encode("utf-8").hex())
def strip_vt100(text):
# ANSI: Standardized, starts with \x1b[.
# VT100: Includes private sequences like \x1b(B, \x1b(A, \x1b>, etc., which are not ANSI but widely supported in terminal emulators.
# Matches all VT100 escape sequences:
# - CSI: \x1b[... (e.g., \x1b[0;1m)
# - Fe: \x1b(..., \x1b), \x1b> (e.g., \x1b(B, \x1b>A)
# - Single-character: \x1b[@-Z\\_-]
vt100_escape = re.compile(r'\x1b(?:[@-Z\\_-]|\[[0-?]*[ -/]*[@-~]|\([A-Za-z]|\)[A-Za-z])')
return vt100_escape.sub('', text)
def parse_mysql_stdout(stdout: str) -> Table:
# example of mysql query output for the query "SHOW TABLES LIKE 'log';":
# > SHOW TABLES LIKE 'log';
# > +-----------------------+
# > | Tables_in_quman (log) |
# > +-----------------------+
# > | log |
# > +-----------------------+
# > 1 row in set (0,001 sec)
# >
# >
# logging.debug("parse_mysql_stdout:: stdout = '%s'", stdout)
table = []
line_index = 0
num_horizontal_separators = 0
num_rows = None
num_empty_lines = 0
column_names = None
for line in stdout.split('\n'):
# print_line_chars(line)
line = strip_vt100(line)
line = line.strip('\r') # remove potential carriage return characters that can be present in the mysql output
# remove all ansi escape codes
logging.debug("parse_mysql_stdout:: line = '%s' (%d characters, hex=%s)", line, len(line), line.encode('utf-8').hex())
if re.match(r'^\s*\+\-+\+\s*$', line):
num_horizontal_separators += 1
elif re.match(r'^\|.*\|$', line):
# eg '| Tables_in_quman (log) |'
is_header = num_horizontal_separators == 1
if is_header:
column_names = [col.strip() for col in line.split('|')[1:-1]]
else:
assert num_horizontal_separators == 2, f'Unexpected case: a line with values should be between two horizontal separators but got num_horizontal_separators={num_horizontal_separators} for line="{line}"'
values = [line.strip() for line in line.split('|')[1:-1]]
table.append(tuple(values))
elif line.endswith(';'):
# this is the echo of the query that we sent to mysql, we can ignore it but we check that it is indeed the first line of the output
# eg: 'SHOW TABLES LIKE 'log';'
assert line_index == 0, f'Unexpected case: the first line of the mysql output should be the query echo but got "{line}"'
elif re.match(r'^Empty set \([^\)]*\)$', line):
# eg: 'Empty set (0,000 sec)'
num_rows = 0
elif re.match(r'^Query OK', line):
# eg Query OK, 1 row affected (0,005 sec)
pass
else:
m = re.match(r'^\s*(?P<num_rows>\d+)\s+row[s]*\s+in\s+set\s+\(.*\)\s*$', line)
if m:
# eg '1 row in set (0,001 sec)'
num_rows = int(m.group('num_rows'))
elif line.strip() == '':
num_empty_lines += 1
else:
assert False, f'Unexpected line in mysql output: "{line}"'
line_index += 1
if num_rows is None:
num_rows = len(table)
assert num_rows == len(table), f'Unexpected case: the number of rows in the mysql output should be {num_rows} but got {len(table)}.'
if num_rows > 0:
assert num_horizontal_separators == 3, f'Unexpected case: there should be exactly 3 horizontal separators in the mysql output but got {num_horizontal_separators}.'
return Table(column_names=column_names, rows=table)
class SshAccessedMysqlConnection(ISqlConnection):
ssh_mysql_db: 'SshAccessedMysqlDb'
mysql_spawn: Optional[pexpect.spawn] = None
mysql_prompt_re: str
def __init__(self, ssh_mysql_db: 'SshAccessedMysqlDb'):
self.ssh_mysql_db = ssh_mysql_db
self.mysql_spawn = None
self.mysql_prompt_re = r'MariaDB \[[^]]+\]> '
def __enter__(self) -> 'SshAccessedMysqlConnection':
# Set up the connection (e.g., open SSH tunnel, connect to MySQL)
command = f'ssh -t "{self.ssh_mysql_db.ssh_user}@{self.ssh_mysql_db.db_server_fqdn}" "mysql --defaults-group-suffix={self.ssh_mysql_db.db_user}"'
# command = [
# 'ssh',
# '-t',
# f"{self.ssh_mysql_db.ssh_user}@{self.ssh_mysql_db.db_server_fqdn}",
# f'mysql --defaults-group-suffix={self.ssh_mysql_db.db_user}'
# ]
logging.debug("SshAccessedMysqlConnection.__enter__: running ssh command '%s'", command)
try:
self.mysql_spawn = pexpect.spawn(command, encoding='utf-8', timeout=10)
assert isinstance(self.mysql_spawn, pexpect.spawn)
# graffy@alambix50:~$ /usr/bin/ssh -t qumandbw@alambix-master.ipr.univ-rennes.fr 'mysql --defaults-group-suffix=qumandbw'
# Welcome to the MariaDB monitor. Commands end with ; or \g.
# Your MariaDB connection id is 845
# Server version: 10.11.14-MariaDB-0+deb12u2 Debian 12
# Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
# Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
# MariaDB [(none)]>
self.mysql_spawn.expect(self.mysql_prompt_re) # Wait for MySQL prompt
show_databases = False
if show_databases:
self.mysql_spawn.sendline("show databases;")
self.mysql_spawn.expect(self.mysql_prompt_re)
logging.debug("databases:")
logging.debug(self.mysql_spawn.before) # Print output
self.mysql_spawn.sendline(f"use {self.ssh_mysql_db.db_name};")
self.mysql_spawn.expect(self.mysql_prompt_re)
except ValueError as e:
logging.error("failed to execute the command '%s' (error: %s)", command, str(e))
traceback.print_exc()
raise RuntimeError(f"Error while connecting to mysql: {str(e)}") from e
return self # Return the connection object for use in the 'with'
def __exit__(self, exc_type, exc_val, exc_tb):
# Clean up the connection
logging.debug("SshAccessedMysqlConnection.__exit__: Closing connection...")
self.mysql_spawn.sendline("quit")
self.mysql_spawn.expect(pexpect.EOF)
self.mysql_spawn = None
if exc_type:
logging.error("An error occurred: %s", exc_val)
return False # Propagate exceptions
def query(self, sql_query: SqlQuery) -> List[str]:
"""
:param str sql_query: the sql query to perform
"""
try:
self.mysql_spawn.sendline(sql_query)
self.mysql_spawn.expect(self.mysql_prompt_re)
stdout = self.mysql_spawn.before
except pexpect.exceptions.ExceptionPexpect as e:
logging.error("SshAccessedMysqlDb.query:: error while executing query '%s': %s", sql_query, str(e))
raise RuntimeError(f"Error while executing query '{sql_query}': {e}") from e
logging.debug("SshAccessedMysqlDb.query:: results of query '%s': %s", sql_query, stdout)
rows = stdout.split('\n') if stdout != '' else []
return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
# MariaDB [quman]> SELECT JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) FROM log;
# +-------------------------------------------------------------------------------------------------------+
# | JSON_ARRAYAGG(JSON_OBJECT('toto', timestamp, 'iddd', user_id)) |
# +-------------------------------------------------------------------------------------------------------+
# | [{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}] |
# +-------------------------------------------------------------------------------------------------------+
# 1 row in set (0,006 sec)
# columns_list_str = ', '.join([f'"{col}"' for col in columns])
columns_list_str = ', '.join([f"'{col}', {col}" for col in columns])
columns_statement = f'JSON_ARRAYAGG(JSON_OBJECT({columns_list_str}))'
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{columns_statement} FROM {table}"
if join_clause:
sql_query += f" JOIN {join_clause}"
if where_clause:
sql_query += f" WHERE {where_clause}"
sql_query += ';'
lines = self.query(sql_query)
table = parse_mysql_stdout('\n'.join(lines))
assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.'
assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.'
json_str = table.get_value(row=0, col=0)
assert isinstance(json_str, str), f'Expected a string as data line in the query output but got {type(json_str)}'
logging.debug("SshAccessedMysqlDb.query_select:: data line of query '%s': '%s'", sql_query, json_str)
# match = re.match(r'^\s*(?P<json_data>{[^}]*})\s*$', data_line)
# assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, stdout)
# json_data = json.loads(match.group('json_data'))
t = json_to_table(json_str)
assert isinstance(t, Table), f'Unexpected case: json_to_table should return a Table object but got {type(t)}'
return t
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
logging.debug("SshAccessedMysqlDb.query_insert:: values = %s", values)
# INSERT INTO log (timestamp, user_id, host_fqdn, queue_machines, action, disable_id, reason) VALUES ('2026-05-26T15:31:53.564287', 'graffy', 'alambix50.ipr.univ-rennes.fr', 'gpuonly.q@alambix104.ipr.univ-rennes1.fr', 'disable', 'quman-sync', 'synchronized with grid engine');SELECT JSON_OBJECT('toto', last_insert_id());
# Query OK, 1 row affected (0,046 sec)
# +---------------------------------------+
# | JSON_OBJECT('toto', last_insert_id()) |
# +---------------------------------------+
# | {"toto": 9} |
# +---------------------------------------+
# 1 row in set (0,002 sec)
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
sql_query = sql_query + ";SELECT JSON_OBJECT('toto', last_insert_id());"
logging.debug("SshAccessedMysqlDb.query_insert:: sql_query = '%s'", sql_query)
stdout_lines = self.query(sql_query)
table: Table = parse_mysql_stdout('\n'.join(stdout_lines))
assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.'
assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.'
data_line = table.get_value(row=0, col=0)
assert isinstance(data_line, str), f'Expected a string as data line in the query output but got {type(data_line)}'
logging.debug("SshAccessedMysqlDb.query_insert:: data line of query '%s': '%s'", sql_query, data_line)
match = re.match(r'^\s*(?P<json_data>{\s*"toto"\s*:\s*(\d+)\s*})\s*$', data_line)
assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, data_line)
json_data = json.loads(match.group('json_data'))
last_insert_id = int(json_data['toto'])
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
return last_insert_id
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
table: Table = parse_mysql_stdout('\n'.join(rows))
logging.debug('len(rows) = %d', len(rows))
logging.debug('rows = %s', str(rows))
assert table.num_rows <= 1, f'Unexpected case: more than one ({len(table)}) tables match the table name {table_name}.'
if table.num_rows > 0:
assert table.column_names[0].startswith('Tables_in_'), f'Unexpected case: the column name in the output of "SHOW TABLES LIKE ..." should start with "Tables_in_" but got "{table.column_names[0]}"'
assert table.get_value(row=0, col=0) == table_name, f'Unexpected case: the table name in the output of "SHOW TABLES LIKE ..." should be "{table_name}" but got "{table.get_value(row=0, col=0)}"'
return table.num_rows == 1
def create_table(self, table_name: str, fields: List[SqlTableField]):
# https://www.sqlite.org/autoinc.html
# > The AUTOINCREMENT keyword imposes extra CPU, memory, disk space, and disk I/O overhead and should be avoided if not strictly needed. It is usually not needed.
fields_sql_descriptions = []
for field in fields:
sql_field_type = {
SqlTableField.Type.FIELD_TYPE_FLOAT: 'real NOT NULL',
SqlTableField.Type.FIELD_TYPE_INT: 'int(11) NOT NULL',
SqlTableField.Type.FIELD_TYPE_STRING: 'varchar(256) NOT NULL',
SqlTableField.Type.FIELD_TYPE_TIME: 'datetime NOT NULL',
}[field.field_type]
if field.is_autoinc_index:
assert field.field_type == SqlTableField.Type.FIELD_TYPE_INT
sql_field_type = 'INTEGER PRIMARY KEY AUTO_INCREMENT'
fields_sql_description = self.get_field_directive(field.name, sql_field_type, field.description)
fields_sql_descriptions.append(fields_sql_description)
sql_create_table_command = f'CREATE TABLE `{table_name}` ({",".join(fields_sql_descriptions)});'
logging.debug('sql_create_table_command = %s', sql_create_table_command)
self.query(sql_create_table_command)
def delete_table(self, table_name: str):
sql_create_table_command = f'DROP TABLE `{table_name}`;'
self.query(sql_create_table_command)
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
return f'`{field_name}` {field_sql_type} COMMENT \'{field_description}\''
def dump(self, sql_file_path: Path):
# mysqldump -u root --quote-names --opt --single-transaction --quick $db >
db = self.ssh_mysql_db
command = f'ssh "{db.ssh_user}@{db.db_server_fqdn}" "mysqldump --defaults-group-suffix={db.db_user}" {db.db_name} > {sql_file_path}'
_ = subprocess.run(command, shell=True, check=False, capture_output=True)
class SshAccessedMysqlDb(ISqlDatabaseBackend):
"""a mysql database server accessed using ssh instead of a remote mysql client
Instead of accessing the remote sql database from mysql client, this method ssh connects (we expect an unattended connection setup using ssh keys) to the server hosting the database, and once logged in performs a request to the database locally using mysql client running on the server hosting the database.
This method has a the following benefits over the simple use of mysql:
- it's more secure since it benefits from ssh encryption
- it doesn't require extra open tcp ports for remote mysql database access
"""
db_server_fqdn: str # the fully qualified domain name of the server hosting the database, eg iprbenchdb.ipr.univ-rennes1.fr
db_user: str # the user for accessing the inventory database, eg iprbenchw
db_name: str # the name of the database, eg iprbench
ssh_user: str # the user on db_server_fqdn that has access to the database db_name
def __init__(self, db_server_fqdn: str, db_user: str, db_name: str, ssh_user: str):
"""
:param str db_server_fqdn: the fully qualified domain name of the server hosting the database, eg iprbenchdb.ipr.univ-rennes1.fr
:param str db_user: the user for accessing the inventory database, eg iprbenchw
:param str db_name: the name of the database, eg iprbench
:param str ssh_user: the user on db_server_fqdn that has access to the database db_name
"""
self.db_server_fqdn = db_server_fqdn
self.db_user = db_user
self.db_name = db_name
self.ssh_user = ssh_user
def connect(self) -> SshAccessedMysqlConnection:
return SshAccessedMysqlConnection(self)
class SqliteConnection(ISqlConnection):
sqlite_db: 'SqliteDb'
_con: sqlite3.Connection
_cur: sqlite3.Cursor
def __init__(self, sql_db: 'SqliteDb'):
self.sqlite_db = sql_db
self._con = None
self._cur = None
def __enter__(self) -> 'SqliteConnection':
# Set up the connection (e.g., open SSH tunnel, connect to MySQL)
logging.debug("Opening connection...")
check_same_thread = False
# this is to prevent the following error when run from apache/django : SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 139672342353664 and this is thread id 139672333960960
# accordig to https://stackoverflow.com/questions/48218065/programmingerror-sqlite-objects-created-in-a-thread-can-only-be-used-in-that-sa this is ok, as long as there are no concurrent writes
# If set False, the returned connection may be shared across multiple threads. When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption
# I hope it's safe here but I'm not 100% sure though. Anyway, if the database gets corrupt, it not a big deal since this memory resident database gets reconstructed from the sql file...
sqlite_db_path = self.sqlite_db.sqlite_db_path
if sqlite_db_path == ':memory:' or not sqlite_db_path.exists():
logging.debug('creating sqlite database in %s', sqlite_db_path)
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
else:
logging.debug('reusing existing sqlite database in %s', sqlite_db_path)
self._con = sqlite3.connect(sqlite_db_path, check_same_thread=check_same_thread)
self._cur = self._con.cursor()
logging.debug('self._con = %s', self._con)
logging.debug('self._cur = %s', self._cur)
_ = self.query('PRAGMA encoding="UTF-8";')
return self # Return the connection object for use in the 'with'
def __exit__(self, exc_type, exc_val, exc_tb):
# Clean up the connection
logging.debug("Closing connection...")
if exc_type:
logging.error("An error occurred: %s", exc_val)
return False # Propagate exceptions
def query(self, sql_query) -> List[str]:
"""
:param str sql_query: the sql query to perform
"""
self._cur.execute(sql_query)
rows = self._cur.fetchall()
self._con.commit()
return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
sql_query = f"SELECT {('DISTINCT ' if distinct else '')}{', '.join(columns)} FROM {table}"
if join_clause:
sql_query += f" JOIN {join_clause}"
if where_clause:
sql_query += f" WHERE {where_clause}"
rows = self.query(sql_query)
return Table(column_names=columns, rows=rows)
def query_insert(self, table_id: str, fields: List[str], values: List[tuple]) -> int:
"""
performs an insert query on the sql database and returns the id of the inserted row
:param str table_id: the name of the table to insert into
:param List[str] fields: the list of fields to insert values into
:param List[tuple] values: the list of values to insert (one tuple per row, tuple values are the column values)
:return: the id of the inserted row
"""
sql_query = f"INSERT INTO {table_id} ({', '.join(fields)}) VALUES {values_to_sql_string(values)}"
logging.debug("SqliteDb.query_insert:: sql_query = '%s'", sql_query)
logging.debug("SqliteDb.query_insert:: values = %s", values)
self._cur.execute(sql_query)
self._cur.execute('SELECT last_insert_rowid();')
rows = self._cur.fetchall()
self._con.commit()
assert len(rows) == 1, f'Unexpected number of rows ({len(rows)}).'
last_insert_id = int(rows[0][0])
assert last_insert_id > 0, f'Unexpected last insert id : {last_insert_id}'
return last_insert_id
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
assert len(rows) <= 1, f'Unexpected case: more than one ({len(rows)}) tables match the table name {table_name}.'
return len(rows) == 1
def create_table(self, table_name: str, fields: List[SqlTableField]):
# https://www.sqlite.org/autoinc.html
# > The AUTOINCREMENT keyword imposes extra CPU, memory, disk space, and disk I/O overhead and should be avoided if not strictly needed. It is usually not needed.
fields_sql_descriptions = []
for field in fields:
sql_field_type = {
SqlTableField.Type.FIELD_TYPE_FLOAT: 'real NOT NULL',
SqlTableField.Type.FIELD_TYPE_INT: 'int(11) NOT NULL',
SqlTableField.Type.FIELD_TYPE_STRING: 'varchar(256) NOT NULL',
SqlTableField.Type.FIELD_TYPE_TIME: 'datetime NOT NULL',
}[field.field_type]
if field.is_autoinc_index:
assert field.field_type == SqlTableField.Type.FIELD_TYPE_INT
sql_field_type = 'INTEGER PRIMARY KEY'
fields_sql_description = self.get_field_directive(field.name, sql_field_type, field.description)
fields_sql_descriptions.append(fields_sql_description)
sql_create_table_command = f'CREATE TABLE `{table_name}` ({",".join(fields_sql_descriptions)});'
logging.debug('sql_create_table_command = %s', sql_create_table_command)
self.query(sql_create_table_command)
def delete_table(self, table_name: str):
sql_create_table_command = f'DROP TABLE `{table_name}`;'
self.query(sql_create_table_command)
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
# sqlite doesn't understand the COMMENT keyword, so we use sql comments ( "--" ), as explained in [https://stackoverflow.com/questions/7426205/sqlite-adding-comment-on-descriptions-to-tables-and-columns]
return f'`{field_name}` {field_sql_type} -- {field_description}\n'
def dump(self, sql_file_path: Path):
with open(sql_file_path, 'wt', encoding='utf8') as f:
for line in self._con.iterdump():
f.write(line)
class SqliteDb(ISqlDatabaseBackend):
sqlite_db_path: Union[Path, str] # the path of the sqlite database. ':memory:' # sqlite-specific special name for a file stored in memory. We could use something like '/tmp/simpadb.sqlite' here but this would make parsing really slow (1 minute instead of 1s), unless either :
def __init__(self, sqlite_db_path: Path):
self.sqlite_db_path = sqlite_db_path
def connect(self) -> SqliteConnection:
return SqliteConnection(self)
class SqlFile(SqliteDb):
_sql_file_path: Path
def __init__(self, sql_file_path: Path, truncate_hex_strings=False):
"""
:param str sql_file_path: the path of the sql file containing the inventory database
"""
self._sql_file_path = sql_file_path
# _con: sqlite3.Connection = None
# _cur: sqlite3.Cursor = None
sqlite_db_path = ':memory:' # sqlite-specific special name for a file stored in memory. We could use something like '/tmp/simpadb.sqlite' here but this would make parsing really slow (1 minute instead of 1s), unless either :
# - proper fix : group of INSERT statements are surrounded by BEGIN and COMMIT (see http://stackoverflow.com/questions/4719836/python-and-sqlite3-adding-thousands-of-rows)
# - the file is stored on a solid state disk
try:
os.remove(sqlite_db_path)
except BaseException: # pylint: disable=broad-except
pass
super().__init__(sqlite_db_path)
with open(str(self._sql_file_path), 'r', encoding='utf8') as f: # str conversion has been added to support older versions of python in which open don't accept arguments of type Path
sql = f.read() # watch out for built-in `str`
# print(sql)
sqlite_sql = mysql_to_sqlite(sql, truncate_hex_strings)
# print(mysql_to_sqlite(sql))
# with open('/tmp/toto.sqlite.sql', 'w') as f:
# f.write(sqlite_sql)
# with open('/tmp/toto.sqlite.sql', 'r') as f:
# sqlite_sql = f.read()
with self.connect() as conn:
conn._cur.executescript(sqlite_sql)
class TableAttrNotFound(Exception):
def __init__(self, table, key_name, key_value, attr_name):
message = "failed to find in table %s a value for %s where %s is %s" % (table, attr_name, key_name, key_value)
super(TableAttrNotFound, self).__init__(message)
self.table = table
self.key_name = key_name
self.key_value = key_value
self.attr_name = attr_name
class SqlDatabaseReader(object):
def __init__(self, inv_provider: ISqlDatabaseBackend):
"""
:param ISqlDatabaseBackend inv_provider: the input that provides the inventory data
"""
self._inv_provider = inv_provider
def query(self, sql_query: SqlQuery):
"""
performs a query on the sql database
:param SqlQuery sql_query: the sql query to perform
"""
return self._inv_provider.query(sql_query)
def get_table_attr(self, table, key_name, key_value, attr_name):
"""
reads the value of the fiven attribute of the given item in the given table
:param str table: the name of the table to read
:param str key_name: the name of the column that stores the id of the item to read
:param str key_value: the id of the item to read
:param str attr_name: the name of the attribute to read from the item
"""
attr_value = None
rows = self.query("SELECT " + attr_name + " FROM " + table + " WHERE " + key_name + "='" + key_value + "'")
if len(rows) > 0:
attr_value = rows[0][0]
else:
raise TableAttrNotFound(table, key_name, key_value, attr_name)
return attr_value

View File

@ -1,4 +1,4 @@
__version__ = '1.0.29'
__version__ = '1.0.30'
class Version(object):

View File

@ -12,6 +12,7 @@ license = {text = "MIT License"}
dependencies = [
"pygraphviz", # requires apt install graphviz-dev
"mysqlclient",
"pexpect"
]
requires-python = ">= 3.8"
authors = [

View File

@ -3,7 +3,7 @@ import json
import unittest
import logging
# from cocluto import ClusterController
from cocluto.SimpaDbUtil import SqliteDb
from cocluto.sqldb import SqliteDb
from cocluto.quman import QueueManager, init_db, MockGridEngine, QueuesStatus
@ -28,7 +28,8 @@ class QumanTestCase(unittest.TestCase):
qs.add_queue('gpuonly.q@alambix42', True)
grid_engine = MockGridEngine(qs)
grid_engine.disable_queue_machine('main.q@alambix42') # simulate that the queue is already disabled)
quman = QueueManager(db_backend, grid_engine)
with db_backend.connect() as sql_conn:
quman = QueueManager(sql_conn, grid_engine)
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')
@ -49,8 +50,9 @@ class QumanTestCase(unittest.TestCase):
quman.request_queue_machines_deactivation(main_queue_machines, 'sysadmin.graffy.bug4242', 'disable all cluster to prepare complete shutdown')
main_queue_machines = quman.get_queue_machines('main.q')
quman.request_queue_machines_activation(main_queue_machines, 'sysadmin.graffy.bug4242', 'electricity is back, reactivating all cluster')
quman.synchronize_with_grid_engine()
db_backend.dump(Path('./quman_test/quman_dump.sql'))
sql_conn.dump(Path('./quman_test/quman_dump.sql'))
print('queues state:')
grid_engine.queues_status.print()
print('disable requests:')

114
test/test_sqldb.py Normal file
View File

@ -0,0 +1,114 @@
from typing import Any
import unittest
import logging
from pathlib import Path
from cocluto.sqldb import parse_mysql_stdout, ISqlDatabaseBackend, SqliteDb, SshAccessedMysqlDb, SqlTableField
def stringify(value: Any):
return f"'{str(value)}'"
def test_sql_backend(sql_backend: ISqlDatabaseBackend):
table_name = 'colutotestmamul1'
fields = [
SqlTableField('measure_id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the measurement', is_autoinc_index=True),
SqlTableField('measurement_time', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this measurment has been made'),
SqlTableField('cpu_model', SqlTableField.Type.FIELD_TYPE_STRING, 'The exact model of the cpu running the benchmark eg "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"'),
SqlTableField('duration', SqlTableField.Type.FIELD_TYPE_FLOAT, 'the duration of the benchmark (in seconds)'),
]
with sql_backend.connect() as sql_conn:
if sql_conn.table_exists(table_name):
sql_conn.delete_table(table_name)
sql_conn.create_table(table_name, fields)
# test ISqlConnection.query method
measurements = [
{
'measurement_time': '1951-12-13 12:34:50',
'cpu_model': 'Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz',
'duration': 0.42
},
{
'measurement_time': '1985-10-26 09:00:00',
'cpu_model': 'Intel(R) Core(TM) i3-1234U CPU @ 1.42GHz',
'duration': 3.14
}
]
for measurement in measurements:
sql_query = f'insert into {table_name}({", ".join([field.name for field in fields if not field.is_autoinc_index])}) values({", ".join([stringify(measurement[field.name]) for field in fields if not field.is_autoinc_index])});'
logging.debug('sql_query = %s', sql_query)
sql_conn.query(sql_query)
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name)
assert table.num_rows == 2, f'Unexpected case: the number of rows in the result of the query should be 2 but got {table.num_rows}.'
# test ISqlConnection.query_insert method
measurements2 = [
{
'measurement_time': '1955-11-12 06:38:00',
'cpu_model': 'Intel(R) Core(TM) i7-6666U CPU @ 2.50GHz',
'duration': 2.71
},
]
expected_measurement_index = 3 # because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index
for measurement in measurements2:
fields = [field.name for field in fields if not field.is_autoinc_index]
values = tuple(measurement[field] for field in fields)
measurement_index = sql_conn.query_insert(table_id=table_name, fields=fields, values=[values])
assert measurement_index == expected_measurement_index, f'Unexpected case: the index of the inserted measurement should be {expected_measurement_index} (because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index) but got {measurement_index}.'
expected_measurement_index += 1
# test ISqlConnection.query_select method
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name, where_clause='cpu_model = "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"')
assert table.num_rows == 1, f'Unexpected case: the number of rows in the result of the query should be 1 but got {table.num_rows}.'
assert table.rows == [('1951-12-13 12:34:50', 0.42)], f'Unexpected case: the rows in the result of the query should be [("1951+12-13 12:34:50", 0.42)] but got {table.rows}.'
# test ISqlConnection.dump method
sql_conn.dump(Path('/tmp/toto.sql'))
sql_conn.delete_table(table_name)
class SimpadbTestCase(unittest.TestCase):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setUp(self) -> None: # pylint: disable=useless-super-delegation
return super().setUp()
def test_mysql_output_parser(self):
mysql_output = '''SHOW TABLES LIKE 'log';\n\
+-----------------------+\n\
| Tables_in_quman (log) |\n\
+-----------------------+\n\
| log |\n\
+-----------------------+\n\
1 row in set (0,001 sec)\n\
\n\
\n\
'''
table = parse_mysql_stdout(mysql_output)
self.assertEqual(table.column_names, ['Tables_in_quman (log)'])
self.assertEqual(table.rows, [('log',)])
def test_sqlite_backend(self):
logging.info('test_sqlite_backend')
backend = SqliteDb(Path('/tmp/toto.sqlite'))
test_sql_backend(backend)
# self.assertIsInstance(job_state, JobsState)
def test_ssh_accessed_mysql_backend(self):
logging.info('test_ssh_accessed_mysql_backend')
db_server_fqdn = 'iprbenchsdb.ipr.univ-rennes1.fr'
db_user = 'test_iprbenchw'
db_name = 'test_iprbenchs'
ssh_user = 'test_iprbenchw'
backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
test_sql_backend(backend)
if __name__ == '__main__':
unittest.main()