cocluto v1.0.31 - made SshAccessedMysqlConnection.query handle errors

- as a result, quman now displays a useful error message when a sql query fails
- also added information messages when quman is synchronizing quman database with the current state of the queues, as this process takes 5 seconds

work related to [https://bugzilla.ipr.univ-rennes.fr/show_bug.cgi?id=3093]
This commit is contained in:
Guillaume Raffy 2026-06-02 18:17:40 +02:00
parent e17287fcf8
commit 73522f83ed
3 changed files with 18 additions and 5 deletions

View File

@ -316,10 +316,12 @@ class QueueManager:
self.grid_engine.enable_queue_machine(queue_machine) self.grid_engine.enable_queue_machine(queue_machine)
enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason, ) # noqa: F841 pylint:disable=unused-variable enable_log_id = self.log_modification(queue_machines, "enable", disable_id, reason, ) # noqa: F841 pylint:disable=unused-variable
for queue_machine in queue_machines: for queue_machine in queue_machines:
logging.debug('deleting disable request %s', dr_to_remove.log_id)
self.sql_conn.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';") self.sql_conn.query(f"DELETE FROM disables WHERE disable_request_id = {dr_to_remove.log_id} AND queue_machine = '{queue_machine}';")
def synchronize_with_grid_engine(self): def synchronize_with_grid_engine(self):
"""synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat.""" """synchronizes the state of the queues in the database with the actual state of the queues in the grid engine by querying qstat."""
logging.info('fixing potential inconsistencies (caused by actors using qmod instead of quman) of quman database with the current state of the cluster queues')
qs = self.grid_engine.get_status() qs = self.grid_engine.get_status()
db_queues = set() db_queues = set()
@ -339,17 +341,21 @@ class QueueManager:
disable_requests = self.get_disable_requests(queue_machine) disable_requests = self.get_disable_requests(queue_machine)
if not is_enabled and len(disable_requests) == 0: if not is_enabled and len(disable_requests) == 0:
# queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine" # queue is disabled in the grid engine but there is no disable reason in the database, we add a disable reason with disable_id "unknown" and reason "synchronized with grid engine"
self.request_queue_machines_deactivation([queue_machine], "quman-sync", "synchronized with grid engine", perform_disable=False) disable_id = "quman-sync"
logging.warning('adding disable %s on %s because this queue is disabled in grid engine', disable_id, queue_machine)
self.request_queue_machines_deactivation([queue_machine], disable_id, "synchronized with grid engine", perform_disable=False)
assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none." assert len(self.get_disable_requests(queue_machine)) > 0, f"After synchronization, there should be at least one disable reason for queue {queue_machine} but there are none."
elif is_enabled and len(disable_requests) > 0: elif is_enabled and len(disable_requests) > 0:
# queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine" # queue is enabled in the grid engine but there are disable requests in the database, we remove all disable requests for this queue and disable_id "unknown" with reason "synchronized with grid engine"
for dr in disable_requests: for dr in disable_requests:
logging.warning('removing disable %s on %s because this queue is enabled in grid engine', dr.disable_id, queue_machine)
self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False) self.request_queue_machines_activation([queue_machine], dr.disable_id, "synchronized with grid engine", perform_enable=False)
assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests." assert len(self.get_disable_requests(queue_machine)) == 0, f"After synchronization, there should be no disable requests for queue {queue_machine} but there are still {len(self.get_disable_requests(queue_machine))} disable requests."
for queue_machine in db_queues: for queue_machine in db_queues:
logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine) logging.warning("Queue %s is in the database but not in the grid engine. Removing it from the database.", queue_machine)
self.sql_conn.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';") self.sql_conn.query(f"DELETE FROM disables WHERE queue_machine = '{queue_machine}';")
self.sql_conn.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';") self.sql_conn.query(f"DELETE FROM queues WHERE queue_machine = '{queue_machine}';")
logging.info('quman database is consistent with the current state of cluster queues')
def get_state(self) -> QueueDisableState: def get_state(self) -> QueueDisableState:
"""returns the state of the queues.""" """returns the state of the queues."""

View File

@ -314,7 +314,7 @@ def parse_mysql_stdout(stdout: str) -> Table:
line = strip_vt100(line) line = strip_vt100(line)
line = line.strip('\r') # remove potential carriage return characters that can be present in the mysql output line = line.strip('\r') # remove potential carriage return characters that can be present in the mysql output
# remove all ansi escape codes # remove all ansi escape codes
logging.debug("parse_mysql_stdout:: line = '%s' (%d characters, hex=%s)", line, len(line), line.encode('utf-8').hex()) # logging.debug("parse_mysql_stdout:: line = '%s' (%d characters, hex=%s)", line, len(line), line.encode('utf-8').hex())
if re.match(r'^\s*\+\-+\+\s*$', line): if re.match(r'^\s*\+\-+\+\s*$', line):
num_horizontal_separators += 1 num_horizontal_separators += 1
elif re.match(r'^\|.*\|$', line): elif re.match(r'^\|.*\|$', line):
@ -432,6 +432,12 @@ class SshAccessedMysqlConnection(ISqlConnection):
raise RuntimeError(f"Error while executing query '{sql_query}': {e}") from e raise RuntimeError(f"Error while executing query '{sql_query}': {e}") from e
logging.debug("SshAccessedMysqlDb.query:: results of query '%s': %s", sql_query, stdout) logging.debug("SshAccessedMysqlDb.query:: results of query '%s': %s", sql_query, stdout)
rows = stdout.split('\n') if stdout != '' else [] rows = stdout.split('\n') if stdout != '' else []
# detect errors in the output such as:
# ERROR 1142 (42000): DELETE command denied to user 'qumandbw'@'localhost' for table `quman`.`disables`
for row in rows:
m = re.match(r'^ERROR ', row)
if m is not None:
raise RuntimeError(f'the sql query "{sql_query}" failed with the error : "{row}"')
return rows return rows
def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table: def query_select(self, columns: List[str], table: str, where_clause: Optional[str] = None, join_clause: Optional[str] = None, distinct: bool = False) -> Table:
@ -456,8 +462,9 @@ class SshAccessedMysqlConnection(ISqlConnection):
assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.' assert table.num_cols == 1, f'Unexpected case: the query should return only one column with the json data but got {table.num_cols} columns.'
assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.' assert table.num_rows == 1, f'Unexpected case: the query should return only one row with the json data but got {table.num_rows} rows.'
json_str = table.get_value(row=0, col=0) json_str = table.get_value(row=0, col=0)
assert isinstance(json_str, str), f'Expected a string as data line in the query output but got {type(json_str)}' # json_str is expected to contain a string like '[{"toto": "2026-05-21 15:11:58", "iddd": "graffy"},{"toto": "2026-05-22 15:55:39", "iddd": "graffy"}]'
logging.debug("SshAccessedMysqlDb.query_select:: data line of query '%s': '%s'", sql_query, json_str) assert isinstance(json_str, str), f'Expected a string as the value of the only cell in the table output but got {type(json_str)}'
# logging.debug("SshAccessedMysqlDb.query_select:: value (in the form of a stringified json tree) of the cell for the query '%s': '%s'", sql_query, json_str)
# match = re.match(r'^\s*(?P<json_data>{[^}]*})\s*$', data_line) # match = re.match(r'^\s*(?P<json_data>{[^}]*})\s*$', data_line)
# assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, stdout) # assert match, 'Unexpected output format for query "%s" : %s' % (sql_query, stdout)
# json_data = json.loads(match.group('json_data')) # json_data = json.loads(match.group('json_data'))

View File

@ -1,4 +1,4 @@
__version__ = '1.0.30' __version__ = '1.0.31'
class Version(object): class Version(object):