cocluto/test/test_quman.py

36 lines
1.6 KiB
Python

from pathlib import Path
import unittest
import logging
# from cocluto import ClusterController
from cocluto.SimpaDbUtil import SqliteDb
from cocluto.quman import QueueManager, init_db, Sge
class QumanTestCase(unittest.TestCase):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setUp(self) -> None:
return super().setUp()
def test_quman(self):
logging.info('test_quman')
db_path = Path('./quman_test/quman.sqlite')
if db_path.exists():
db_path.unlink()
db_backend = SqliteDb(db_path)
init_db(db_backend)
quman = QueueManager(db_backend, Sge(dry_run=True)) # set dry_run to True to not actually run qmod commands
quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'disabled to move the alambix42 to another rack')
with self.assertRaises(AssertionError):
# attempting to disable the same queue again with the same disable tag should raise an assertion error (the tag is used to uniquely identify the disables on the machine)
quman.request_queue_deactivation('main.q@alambix42', 'sysadmin.graffy', 'because I want to test quman')
quman.request_queue_deactivation('main.q@alambix42', 'croconaus.maco-update', 'disabled to update maco')
quman.request_queue_activation('main.q@alambix42', 'sysadmin.graffy', 'alambix42 has been moved to a new rack')
# self.assertIsInstance(job_state, JobsState)
db_backend.dump(Path('./quman_test/quman_dump.sql'))
if __name__ == '__main__':
unittest.main()