2023-06-07 18:23:46 +02:00
|
|
|
import unittest
|
|
|
|
import logging
|
|
|
|
# from cocluto import ClusterController
|
|
|
|
from cocluto.ClusterController.QstatParser import QstatParser
|
|
|
|
from cocluto.ClusterController.JobsState import JobsState
|
2023-06-08 11:18:07 +02:00
|
|
|
from cocluto.SunGridEngine.SgeConfig import SgeConfig
|
2023-06-07 18:23:46 +02:00
|
|
|
|
|
|
|
|
|
|
|
class CoclutoTestCase(unittest.TestCase):
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
|
|
def setUp(self) -> None:
|
|
|
|
return super().setUp()
|
|
|
|
|
|
|
|
def test_qstat_parser(self):
|
|
|
|
logging.info('test_qstat_parser')
|
|
|
|
# command = ['qstat', '-f', '-u', '*', '-pri']
|
|
|
|
with open('test/qstat-output-001.txt', 'rt') as file:
|
|
|
|
qstat_output = file.read()
|
|
|
|
# qstatParser = ClusterController.QstatParser()
|
|
|
|
qstatParser = QstatParser()
|
2023-11-17 18:28:20 +01:00
|
|
|
job_state = qstatParser.parseQstatOutput(qstat_output, cluster_domain='ipr.univ-rennes1.fr')
|
2023-06-07 18:23:46 +02:00
|
|
|
|
|
|
|
self.assertIsInstance(job_state, JobsState)
|
2023-06-08 11:18:07 +02:00
|
|
|
|
|
|
|
def test_sgeformat1_parser(self):
|
|
|
|
logging.info('test_sgeformat1_parser')
|
|
|
|
with open('test/qconf-se-physix71.stdout', 'rt') as f:
|
|
|
|
qconf_output = f.read()
|
|
|
|
exec_host_attrs = SgeConfig()
|
|
|
|
exec_host_attrs.loadFromSgeFormat1String(qconf_output)
|
2023-06-07 18:23:46 +02:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|