from typing import Any import unittest import logging from pathlib import Path from cocluto.sqldb import parse_mysql_stdout, ISqlDatabaseBackend, SqliteDb, SshAccessedMysqlDb, SqlTableField def stringify(value: Any): return f"'{str(value)}'" def test_sql_backend(sql_backend: ISqlDatabaseBackend): table_name = 'colutotestmamul1' fields = [ SqlTableField('measure_id', SqlTableField.Type.FIELD_TYPE_INT, 'unique identifier of the measurement', is_autoinc_index=True), SqlTableField('measurement_time', SqlTableField.Type.FIELD_TYPE_TIME, 'the time (and date) at which this measurment has been made'), SqlTableField('cpu_model', SqlTableField.Type.FIELD_TYPE_STRING, 'The exact model of the cpu running the benchmark eg "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"'), SqlTableField('duration', SqlTableField.Type.FIELD_TYPE_FLOAT, 'the duration of the benchmark (in seconds)'), ] with sql_backend.connect() as sql_conn: if sql_conn.table_exists(table_name): sql_conn.delete_table(table_name) sql_conn.create_table(table_name, fields) # test ISqlConnection.query method measurements = [ { 'measurement_time': '1951-12-13 12:34:50', 'cpu_model': 'Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz', 'duration': 0.42 }, { 'measurement_time': '1985-10-26 09:00:00', 'cpu_model': 'Intel(R) Core(TM) i3-1234U CPU @ 1.42GHz', 'duration': 3.14 } ] for measurement in measurements: sql_query = f'insert into {table_name}({", ".join([field.name for field in fields if not field.is_autoinc_index])}) values({", ".join([stringify(measurement[field.name]) for field in fields if not field.is_autoinc_index])});' logging.debug('sql_query = %s', sql_query) sql_conn.query(sql_query) table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name) assert table.num_rows == 2, f'Unexpected case: the number of rows in the result of the query should be 2 but got {table.num_rows}.' # test ISqlConnection.query_insert method measurements2 = [ { 'measurement_time': '1955-11-12 06:38:00', 'cpu_model': 'Intel(R) Core(TM) i7-6666U CPU @ 2.50GHz', 'duration': 2.71 }, ] expected_measurement_index = 3 # because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index for measurement in measurements2: fields = [field.name for field in fields if not field.is_autoinc_index] values = tuple(measurement[field] for field in fields) measurement_index = sql_conn.query_insert(table_id=table_name, fields=fields, values=[values]) assert measurement_index == expected_measurement_index, f'Unexpected case: the index of the inserted measurement should be {expected_measurement_index} (because the first measurement inserted in the table has measure_id = 1 and measure_id is an autoinc index) but got {measurement_index}.' expected_measurement_index += 1 # test ISqlConnection.query_select method table = sql_conn.query_select(['measurement_time', 'duration'], table=table_name, where_clause='cpu_model = "Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz"') assert table.num_rows == 1, f'Unexpected case: the number of rows in the result of the query should be 1 but got {table.num_rows}.' assert table.rows == [('1951-12-13 12:34:50', 0.42)], f'Unexpected case: the rows in the result of the query should be [("1951+12-13 12:34:50", 0.42)] but got {table.rows}.' # test ISqlConnection.dump method sql_conn.dump(Path('/tmp/toto.sql')) sql_conn.delete_table(table_name) class SimpadbTestCase(unittest.TestCase): logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def setUp(self) -> None: # pylint: disable=useless-super-delegation return super().setUp() def test_mysql_output_parser(self): mysql_output = '''SHOW TABLES LIKE 'log';\n\ +-----------------------+\n\ | Tables_in_quman (log) |\n\ +-----------------------+\n\ | log |\n\ +-----------------------+\n\ 1 row in set (0,001 sec)\n\ \n\ \n\ ''' table = parse_mysql_stdout(mysql_output) self.assertEqual(table.column_names, ['Tables_in_quman (log)']) self.assertEqual(table.rows, [('log',)]) def test_sqlite_backend(self): logging.info('test_sqlite_backend') backend = SqliteDb(Path('/tmp/toto.sqlite')) test_sql_backend(backend) # self.assertIsInstance(job_state, JobsState) def test_ssh_accessed_mysql_backend(self): logging.info('test_ssh_accessed_mysql_backend') db_server_fqdn = 'iprbenchsdb.ipr.univ-rennes1.fr' db_user = 'test_iprbenchw' db_name = 'test_iprbenchs' ssh_user = 'test_iprbenchw' backend = SshAccessedMysqlDb(db_server_fqdn, db_user, db_name, ssh_user) test_sql_backend(backend) if __name__ == '__main__': unittest.main()