diff --git a/ClusterController/ClusterController.py b/ClusterController/ClusterController.py index 749eec3..06dd3a6 100644 --- a/ClusterController/ClusterController.py +++ b/ClusterController/ClusterController.py @@ -11,6 +11,7 @@ from Log import * from ClusterNodeStatusUpdater import * from SunGridEngine import SunGridEngine import Util +from WebServer import WebServerThread from HTMLParser import HTMLParser @@ -77,9 +78,10 @@ class ClusterController: self.m_lastEnergyStatusLogTime = None self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds self.m_iSessionId = None # session (run) identifier in database + self.m_webServer = WebServerThread(self) def getClusterStatus( self ): - return m_clusterStatus + return self.m_clusterStatus def log( self, message ): print message @@ -220,7 +222,12 @@ class ClusterController: conn.query(sqlCommand) conn.close() - + + def setControlOnMachine(self, machineName, bControl): + """ + adds or removes the control of ClusterController on the given machine + """ + self.m_clusterStatus.setControlOnMachine(machineName, bControl) def run( self ): """ @@ -229,6 +236,7 @@ class ClusterController: log("storeSessionInDatabase completed") DELAY_BETWEEN_MEASURES = 10 # in seconds self.m_clusterStatus.startReadingThreads() + self.m_webServer.start() while not self.m_clusterStatus.isReady(): log('waiting for system to be ready') time.sleep(1) diff --git a/ClusterController/ClusterStatus.py b/ClusterController/ClusterStatus.py index 490ba54..111af65 100644 --- a/ClusterController/ClusterStatus.py +++ b/ClusterController/ClusterStatus.py @@ -17,8 +17,8 @@ class ClusterStatus: self.m_lock = threading.Lock() # to prevent concurrent access to this instance self.m_jobsStateUpdater = JobsStateUpdater( self ) self.m_jobsState = None - self.m_controlledMachineNames = [ 'simpatix30' ] - #self.m_controlledMachineNames = [] # [ 'simpatix30' ] + #self.m_controlledMachineNames = [ 'simpatix30' ] + self.m_controlledMachineNames = [] # [ 'simpatix30' ] if False: for iMachine in range(11, 40): if (iMachine == 31) or (iMachine == 32): @@ -36,6 +36,27 @@ class ClusterStatus: self.m_clusterNodes[ nodeName ] = clusterNode return + + def setControlOnMachine(self, machineName, bControl): + if bControl: + # add machineName under control of ClusterController + for k, v in self.m_clusterNodes.iteritems(): + if v.getName() == machineName : + return # nothing to do : machineName is already under the control of ClusterController + + clusterNode = ClusterNode( machineName, self, self.m_gridEngine ) + if machineName == 'simpatix10': + clusterNode.setShouldAlwaysBeOn() + self.m_clusterNodes[ machineName ] = clusterNode + clusterNode.m_machineStatusUpdater.start() + else: + # remove machineName from control of ClusterController + clusterNode = self.m_clusterNodes.get(machineName) + if clusterNode: + clusterNode.m_machineStatusUpdater.m_bStop = True + clusterNode.m_machineStatusUpdater.join() + self.m_clusterNodes.pop(machineName) + def getGridEngine( self ): return self.m_gridEngine diff --git a/ClusterController/WebServer.py b/ClusterController/WebServer.py new file mode 100644 index 0000000..ea84c30 --- /dev/null +++ b/ClusterController/WebServer.py @@ -0,0 +1,97 @@ +#Copyright Jon Berg , turtlemeat.com + +import string,cgi,time +from os import curdir, sep +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +import threading +import Util +#import pri +from urlparse import urlparse, parse_qs +#>>> url = 'http://example.com/?foo=bar&one=1' +#>>> parse_qs(urlparse(url).query) +#{'foo': ['bar'], 'one': ['1']} + + +class MyHandler(BaseHTTPRequestHandler): + + def do_GET(self): + try: + paramsDict=parse_qs(urlparse(self.path).query) + if self.path.endswith(".html"): + f = open(curdir + sep + self.path) #self.path has /test.html +#note that this potentially makes every file on your computer readable by the internet + + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + self.wfile.write(f.read()) + f.close() + return + if self.path.endswith(".esp"): #our dynamic content + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + self.wfile.write("hey, today is the" + str(time.localtime()[7])) + self.wfile.write(" day in the year " + str(time.localtime()[0])) + return + if self.path.endswith("ShowControlledMachines"): #http://simpatix10.univ-rennes1.fr:8080/ShowControlledMachines + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + self.wfile.write("hey, today is the" + str(time.localtime()[7])) + self.wfile.write(" day in the year " + str(time.localtime()[0])) + for machine in self.server.m_clusterController.m_clusterStatus.m_clusterNodes.itervalues(): + self.wfile.write("%s is controlled by ClusterController" % machine.getName()) + return + if urlparse(self.path).path == '/SetControlOnMachine': #http://simpatix10.univ-rennes1.fr:8080/SetControlOnMachine?machineName=simpatix30&control=1 + machineName = paramsDict['machineName'][0] + bControl = (paramsDict['control'][0] == '1') + self.server.m_clusterController.setControlOnMachine(machineName, bControl) + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + if bControl == True: + self.wfile.write("%s is now controlled by ClusterController" % machineName) + else: + self.wfile.write("%s is no longer controlled by ClusterController" % machineName) + return + + return + + except IOError: + self.send_error(404,'File Not Found: %s' % self.path) + + + def do_POST(self): + global rootnode + try: + ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) + if ctype == 'multipart/form-data': + query=cgi.parse_multipart(self.rfile, pdict) + self.send_response(301) + + self.end_headers() + upfilecontent = query.get('upfile') + print "filecontent", upfilecontent[0] + self.wfile.write("POST OK.

"); + self.wfile.write(upfilecontent[0]); + + except : + pass + +class WebServerThread( threading.Thread ): + def __init__( self, clusterController ): + threading.Thread.__init__(self) + #self.m_clusterController = clusterController + self.m_bStop = False + self.m_httpServer = HTTPServer(('', 8080), MyHandler) + self.m_httpServer.m_clusterController = clusterController + def run( self ): + try: + while not self.m_bStop: + self.m_httpServer.handle_request() + #self.m_httpServer.serve_forever() + except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt) + self.m_httpServer.socket.close() + Util.onException(exception) +