cocluto v1.05

- added to the class `ISqlDatabaseBackend` the ability to delete a sql table
- also added `SshAccessedMysqlDb`: an implementation of `ISqlDatabaseBackend` that implements the sql database as a remote sql database accessed using ssh
- added unit tests to validate the backends SshAccessedMysqlDb and SqliteDb

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3958]
This commit is contained in:
Guillaume Raffy 2024-11-12 17:36:53 +01:00
parent 101fb6d8b2
commit fbf565fd8a
3 changed files with 169 additions and 1 deletions

View File

@ -16,6 +16,7 @@ from .Util import execute_program, execute_command, log
import abc
import sqlite3
from .mysql2sqlite import mysql_to_sqlite
import subprocess
def is_machine_responding(machineName):
@ -76,25 +77,42 @@ class ISqlDatabaseBackend(object):
"""
:param str sql_query: the sql query to perform
"""
raise NotImplementedError()
@abc.abstractmethod
def table_exists(self, table_name: str) -> bool:
"""returns true if the given table exists in the database
"""
raise NotImplementedError()
@abc.abstractmethod
def create_table(self, table_name: str, fields: List[SqlTableField]):
"""creates the table in this sql database
"""
raise NotImplementedError()
@abc.abstractmethod
def delete_table(self, table_name: str):
"""deletes the given table in this sql database
"""
raise NotImplementedError()
# @abc.abstractmethod
# def add_table_row(self, table_name: str, values: List[SqlTableField]):
# """creates the table in this sql database
# """
# raise NotImplementedError()
@abc.abstractmethod
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
"""returns the sql directive for the declaration of the given table field (eg "`matrix_size` real NOT NULL COMMENT 'the size of the matrix'")
"""
raise NotImplementedError()
@abc.abstractmethod
def dump(self, sql_file_path: Path):
"""dumps this database into the given sql file"""
raise NotImplementedError()
class RemoteMysqlDb(ISqlDatabaseBackend):
@ -136,6 +154,83 @@ class RemoteMysqlDb(ISqlDatabaseBackend):
raise NotImplementedError()
class SshAccessedMysqlDb(ISqlDatabaseBackend):
"""a mysql database server accessed using ssh instead of a remote mysql client
Instead of accessing the remote sql database from mysql client, this method ssh connects (we expect an unattended connection setup using ssh keys) to the server hosting the database, and once logged in performs a request to the database locally using mysql client running on the server hosting the database.
This method has a the following benefits over the simple use of mysql:
- it's more secure since it benefits from ssh encryption
- it doesn't require extra open tcp ports for remote mysql database access
"""
def __init__(self, db_server_fqdn: str, db_user: str, db_name: str, ssh_user: str):
"""
:param str db_server_fqdn: the fully qualified domain name of the server hosting the database, eg iprbenchdb.ipr.univ-rennes1.fr
:param str db_user: the user for accessing the inventory database, eg iprbenchw
:param str db_name: the name of the database, eg iprbench
:param str ssh_user: the user on db_server_fqdn that has access to the database db_name
"""
self._db_server_fqdn = db_server_fqdn
self._db_user = db_user
self._db_name = db_name
self.ssh_user = ssh_user
def query(self, sql_query: SqlQuery):
"""
:param str sql_query: the sql query to perform
"""
escaped_sql_command = sql_query.replace('`', r'\\\`')
command = f'ssh "{self.ssh_user}@{self._db_server_fqdn}" "echo \\"use {self._db_name}; {escaped_sql_command}\\" | mysql --defaults-group-suffix={self._db_user}"'
completed_process = subprocess.run(command, shell=True, check=True, capture_output=True)
rows = completed_process.stdout.decode('utf-8').split('\n')
return rows
def table_exists(self, table_name: str) -> bool:
rows = self.query(f"SHOW TABLES LIKE '{table_name}';")
assert re.match(r'^Tables_in_', rows[0]), f'unexpected value for the 1st line : {rows[0]}. (the 1st line is expected to contain something like "Tables_in_test_iprbenchs (dummy)")'
logging.debug('len(rows) = %d', len(rows))
logging.debug('rows = %s', str(rows))
assert len(rows) <= 3, f'Unexpected case: more than one ({len(rows) - 2}) tables match the table name {table_name}.'
if len(rows) >= 3:
assert rows[1] == table_name
return len(rows) == 3
def create_table(self, table_name: str, fields: List[SqlTableField]):
# https://www.sqlite.org/autoinc.html
# > The AUTOINCREMENT keyword imposes extra CPU, memory, disk space, and disk I/O overhead and should be avoided if not strictly needed. It is usually not needed.
fields_sql_descriptions = []
for field in fields:
sql_field_type = {
SqlTableField.Type.FIELD_TYPE_FLOAT: 'real NOT NULL',
SqlTableField.Type.FIELD_TYPE_INT: 'int(11) NOT NULL',
SqlTableField.Type.FIELD_TYPE_STRING: 'varchar(256) NOT NULL',
SqlTableField.Type.FIELD_TYPE_TIME: 'datetime NOT NULL',
}[field.field_type]
if field.is_autoinc_index:
assert field.field_type == SqlTableField.Type.FIELD_TYPE_INT
sql_field_type = 'INTEGER PRIMARY KEY'
fields_sql_description = self.get_field_directive(field.name, sql_field_type, field.description)
fields_sql_descriptions.append(fields_sql_description)
sql_create_table_command = f'CREATE TABLE `{table_name}` ({",".join(fields_sql_descriptions)});'
logging.debug('sql_create_table_command = %s', sql_create_table_command)
self.query(sql_create_table_command)
def delete_table(self, table_name: str):
sql_create_table_command = f'DROP TABLE `{table_name}`;'
self.query(sql_create_table_command)
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
return f'`{field_name}` {field_sql_type} COMMENT \'{field_description}\''
def dump(self, sql_file_path: Path):
# mysqldump -u root --quote-names --opt --single-transaction --quick $db >
command = f'ssh "{self.ssh_user}@{self._db_server_fqdn}" "mysqldump --defaults-group-suffix={self._db_user}" {self._db_name} > {sql_file_path}'
_ = subprocess.run(command, shell=True, check=True, capture_output=True)
class SqliteDb(ISqlDatabaseBackend):
sqlite_db_path: Union[Path, str] # ':memory:' # sqlite-specific special name for a file stored in memory. We could use something like '/tmp/simpadb.sqlite' here but this would make parsing really slow (1 minute instead of 1s), unless either :
_con: sqlite3.Connection
@ -202,6 +297,10 @@ class SqliteDb(ISqlDatabaseBackend):
logging.debug('sql_create_table_command = %s', sql_create_table_command)
self.query(sql_create_table_command)
def delete_table(self, table_name: str):
sql_create_table_command = f'DROP TABLE `{table_name}`;'
self.query(sql_create_table_command)
def get_field_directive(self, field_name: str, field_sql_type: str, field_description: str) -> str:
# sqlite doesn't understand the COMMENT keyword, so we use sql comments ( "--" ), as explained in [https://stackoverflow.com/questions/7426205/sqlite-adding-comment-on-descriptions-to-tables-and-columns]
return f'`{field_name}` {field_sql_type} -- {field_description}\n'

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='cocluto',
version=1.04,
version=1.05,
description='compute cluster utility tools',
url='https://git.ipr.univ-rennes1.fr/graffy/cocluto',
author='Guillaume Raffy',

69
test/test_simpadb.py Normal file
View File

@ -0,0 +1,69 @@
from typing import Any
import unittest
import logging
from pathlib import Path
from cocluto.SimpaDbUtil import ISqlDatabaseBackend, SqliteDb, SqlTableField, SshAccessedMysqlDb
def stringify(value: Any):
return f"'{str(value)}'"
def test_sql_backend(sql_backend: ISqlDatabaseBackend):
table_name = 'dummy'
fields = [
SqlTableField('family_name', SqlTableField.Type.FIELD_TYPE_STRING, 'family name'),
SqlTableField('birth_area_code', SqlTableField.Type.FIELD_TYPE_INT, 'the number encoding the birth department'),
SqlTableField('height', SqlTableField.Type.FIELD_TYPE_FLOAT, 'height (in meters)'),
SqlTableField('birth_date', SqlTableField.Type.FIELD_TYPE_TIME, 'birth date')
]
if sql_backend.table_exists(table_name):
sql_backend.delete_table(table_name)
sql_backend.create_table(table_name, fields)
persons = [
{
'family_name': 'Dupont',
'birth_area_code': 35,
'height': 1.75,
'birth_date': '1950-11-12 01:23:45'
},
{
'family_name': 'Dupond',
'birth_area_code': 44,
'height': 1.74,
'birth_date': '1951+12-13 12:34:50'
}
]
for person in persons:
sql_query = f'insert into {table_name}({", ".join([field.name for field in fields])}) values({", ".join([stringify(person[field.name]) for field in fields])});'
logging.debug('sql_query = %s', sql_query)
sql_backend.query(sql_query)
sql_backend.dump(Path('/tmp/toto.sql'))
class SimpadbTestCase(unittest.TestCase):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setUp(self) -> None:
return super().setUp()
def test_sqlite_backend(self):
logging.info('test_sqlite_backend')
backend = SqliteDb(Path('/tmp/toto.sqlite'))
test_sql_backend(backend)
# self.assertIsInstance(job_state, JobsState)
def test_ssh_accessed_mysql_backend(self):
logging.info('test_ssh_accessed_mysql_backend')
db_server_fqdn = 'iprbenchsdb.ipr.univ-rennes1.fr'
db_user = 'test_iprbenchw'
db_name = 'test_iprbenchs'
ssh_user = 'test_iprbenchw'
backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
test_sql_backend(backend)
if __name__ == '__main__':
unittest.main()