cocluto/test/test_cocluto.py

37 lines
1.2 KiB
Python
Raw Permalink Normal View History

import unittest
import logging
# from cocluto import ClusterController
from cocluto.ClusterController.QstatParser import QstatParser
from cocluto.ClusterController.JobsState import JobsState
from cocluto.SunGridEngine.SgeConfig import SgeConfig
class CoclutoTestCase(unittest.TestCase):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setUp(self) -> None:
return super().setUp()
def test_qstat_parser(self):
logging.info('test_qstat_parser')
# command = ['qstat', '-f', '-u', '*', '-pri']
with open('test/qstat-output-001.txt', 'rt') as file:
qstat_output = file.read()
# qstatParser = ClusterController.QstatParser()
qstatParser = QstatParser()
job_state = qstatParser.parse_qstat_output(qstat_output, cluster_domain='ipr.univ-rennes1.fr')
self.assertIsInstance(job_state, JobsState)
def test_sgeformat1_parser(self):
logging.info('test_sgeformat1_parser')
with open('test/qconf-se-physix71.stdout', 'rt') as f:
qconf_output = f.read()
exec_host_attrs = SgeConfig()
exec_host_attrs.loadFromSgeFormat1String(qconf_output)
if __name__ == '__main__':
unittest.main()