115 lines
5.2 KiB
Python
115 lines
5.2 KiB
Python
from typing import Any
|
|
import unittest
|
|
import logging
|
|
from pathlib import Path
|
|
from cocluto.sqldb import parse_mysql_stdout, ISqlDatabaseBackend, SqliteDb, SshAccessedMysqlDb, SqlTableField
|
|
|
|
|
|
def stringify(value: Any):
|
|
return f"'{str(value)}'"
|
|
|
|
|
|
def test_sql_backend(sql_backend: ISqlDatabaseBackend):
|
|
table_name = 'colutotestmamul1'
|
|
fields = [
|
|
SqlTableField('measure_id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the measurement', is_autoinc_index=True),
|
|
SqlTableField('measurement_time', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this measurment has been made'),
|
|
SqlTableField('cpu_model', SqlTableField.Type.FIELD_TYPE_STRING, 'The exact model of the cpu running the benchmark eg "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"'),
|
|
SqlTableField('duration', SqlTableField.Type.FIELD_TYPE_FLOAT, 'the duration of the benchmark (in seconds)'),
|
|
]
|
|
with sql_backend.connect() as sql_conn:
|
|
if sql_conn.table_exists(table_name):
|
|
sql_conn.delete_table(table_name)
|
|
sql_conn.create_table(table_name, fields)
|
|
|
|
# test ISqlConnection.query method
|
|
measurements = [
|
|
{
|
|
'measurement_time': '1951-12-13 12:34:50',
|
|
'cpu_model': 'Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz',
|
|
'duration': 0.42
|
|
},
|
|
{
|
|
'measurement_time': '1985-10-26 09:00:00',
|
|
'cpu_model': 'Intel(R) Core(TM) i3-1234U CPU @ 1.42GHz',
|
|
'duration': 3.14
|
|
}
|
|
]
|
|
|
|
for measurement in measurements:
|
|
sql_query = f'insert into {table_name}({", ".join([field.name for field in fields if not field.is_autoinc_index])}) values({", ".join([stringify(measurement[field.name]) for field in fields if not field.is_autoinc_index])});'
|
|
logging.debug('sql_query = %s', sql_query)
|
|
sql_conn.query(sql_query)
|
|
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name)
|
|
assert table.num_rows == 2, f'Unexpected case: the number of rows in the result of the query should be 2 but got {table.num_rows}.'
|
|
|
|
# test ISqlConnection.query_insert method
|
|
measurements2 = [
|
|
{
|
|
'measurement_time': '1955-11-12 06:38:00',
|
|
'cpu_model': 'Intel(R) Core(TM) i7-6666U CPU @ 2.50GHz',
|
|
'duration': 2.71
|
|
},
|
|
]
|
|
|
|
expected_measurement_index = 3 # because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index
|
|
for measurement in measurements2:
|
|
fields = [field.name for field in fields if not field.is_autoinc_index]
|
|
values = tuple(measurement[field] for field in fields)
|
|
measurement_index = sql_conn.query_insert(table_id=table_name, fields=fields, values=[values])
|
|
assert measurement_index == expected_measurement_index, f'Unexpected case: the index of the inserted measurement should be {expected_measurement_index} (because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index) but got {measurement_index}.'
|
|
expected_measurement_index += 1
|
|
|
|
# test ISqlConnection.query_select method
|
|
|
|
table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name, where_clause='cpu_model = "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"')
|
|
assert table.num_rows == 1, f'Unexpected case: the number of rows in the result of the query should be 1 but got {table.num_rows}.'
|
|
assert table.rows == [('1951-12-13 12:34:50', 0.42)], f'Unexpected case: the rows in the result of the query should be [("1951+12-13 12:34:50", 0.42)] but got {table.rows}.'
|
|
|
|
# test ISqlConnection.dump method
|
|
sql_conn.dump(Path('/tmp/toto.sql'))
|
|
sql_conn.delete_table(table_name)
|
|
|
|
|
|
class SimpadbTestCase(unittest.TestCase):
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
def setUp(self) -> None: # pylint: disable=useless-super-delegation
|
|
return super().setUp()
|
|
|
|
def test_mysql_output_parser(self):
|
|
mysql_output = '''SHOW TABLES LIKE 'log';\n\
|
|
+-----------------------+\n\
|
|
| Tables_in_quman (log) |\n\
|
|
+-----------------------+\n\
|
|
| log |\n\
|
|
+-----------------------+\n\
|
|
1 row in set (0,001 sec)\n\
|
|
\n\
|
|
\n\
|
|
'''
|
|
table = parse_mysql_stdout(mysql_output)
|
|
self.assertEqual(table.column_names, ['Tables_in_quman (log)'])
|
|
self.assertEqual(table.rows, [('log',)])
|
|
|
|
def test_sqlite_backend(self):
|
|
logging.info('test_sqlite_backend')
|
|
backend = SqliteDb(Path('/tmp/toto.sqlite'))
|
|
test_sql_backend(backend)
|
|
# self.assertIsInstance(job_state, JobsState)
|
|
|
|
def test_ssh_accessed_mysql_backend(self):
|
|
logging.info('test_ssh_accessed_mysql_backend')
|
|
db_server_fqdn = 'iprbenchsdb.ipr.univ-rennes1.fr'
|
|
db_user = 'test_iprbenchw'
|
|
db_name = 'test_iprbenchs'
|
|
ssh_user = 'test_iprbenchw'
|
|
|
|
backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user)
|
|
test_sql_backend(backend)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|