ai ajouté un mécanisme (peu blindé mais qui fonctionne) qui permet d'ajouter ou d'enlever des machines du controle de ClusterController via un serveur http (pour la première fois, on peut interagir avec le daemon ClusterController)

This commit is contained in:
Guillaume Raffy 2012-06-26 16:08:36 +00:00
parent 006d8752c9
commit 319c78dd86
3 changed files with 130 additions and 4 deletions

View File

@ -11,6 +11,7 @@ from Log import *
from ClusterNodeStatusUpdater import * from ClusterNodeStatusUpdater import *
from SunGridEngine import SunGridEngine from SunGridEngine import SunGridEngine
import Util import Util
from WebServer import WebServerThread
from HTMLParser import HTMLParser from HTMLParser import HTMLParser
@ -77,9 +78,10 @@ class ClusterController:
self.m_lastEnergyStatusLogTime = None self.m_lastEnergyStatusLogTime = None
self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds self.DELAY_BETWEEN_ENERGY_STATUS_LOGS = 60 # in seconds
self.m_iSessionId = None # session (run) identifier in database self.m_iSessionId = None # session (run) identifier in database
self.m_webServer = WebServerThread(self)
def getClusterStatus( self ): def getClusterStatus( self ):
return m_clusterStatus return self.m_clusterStatus
def log( self, message ): def log( self, message ):
print message print message
@ -221,6 +223,11 @@ class ClusterController:
conn.close() conn.close()
def setControlOnMachine(self, machineName, bControl):
"""
adds or removes the control of ClusterController on the given machine
"""
self.m_clusterStatus.setControlOnMachine(machineName, bControl)
def run( self ): def run( self ):
""" """
@ -229,6 +236,7 @@ class ClusterController:
log("storeSessionInDatabase completed") log("storeSessionInDatabase completed")
DELAY_BETWEEN_MEASURES = 10 # in seconds DELAY_BETWEEN_MEASURES = 10 # in seconds
self.m_clusterStatus.startReadingThreads() self.m_clusterStatus.startReadingThreads()
self.m_webServer.start()
while not self.m_clusterStatus.isReady(): while not self.m_clusterStatus.isReady():
log('waiting for system to be ready') log('waiting for system to be ready')
time.sleep(1) time.sleep(1)

View File

@ -17,8 +17,8 @@ class ClusterStatus:
self.m_lock = threading.Lock() # to prevent concurrent access to this instance self.m_lock = threading.Lock() # to prevent concurrent access to this instance
self.m_jobsStateUpdater = JobsStateUpdater( self ) self.m_jobsStateUpdater = JobsStateUpdater( self )
self.m_jobsState = None self.m_jobsState = None
self.m_controlledMachineNames = [ 'simpatix30' ] #self.m_controlledMachineNames = [ 'simpatix30' ]
#self.m_controlledMachineNames = [] # [ 'simpatix30' ] self.m_controlledMachineNames = [] # [ 'simpatix30' ]
if False: if False:
for iMachine in range(11, 40): for iMachine in range(11, 40):
if (iMachine == 31) or (iMachine == 32): if (iMachine == 31) or (iMachine == 32):
@ -36,6 +36,27 @@ class ClusterStatus:
self.m_clusterNodes[ nodeName ] = clusterNode self.m_clusterNodes[ nodeName ] = clusterNode
return return
def setControlOnMachine(self, machineName, bControl):
if bControl:
# add machineName under control of ClusterController
for k, v in self.m_clusterNodes.iteritems():
if v.getName() == machineName :
return # nothing to do : machineName is already under the control of ClusterController
clusterNode = ClusterNode( machineName, self, self.m_gridEngine )
if machineName == 'simpatix10':
clusterNode.setShouldAlwaysBeOn()
self.m_clusterNodes[ machineName ] = clusterNode
clusterNode.m_machineStatusUpdater.start()
else:
# remove machineName from control of ClusterController
clusterNode = self.m_clusterNodes.get(machineName)
if clusterNode:
clusterNode.m_machineStatusUpdater.m_bStop = True
clusterNode.m_machineStatusUpdater.join()
self.m_clusterNodes.pop(machineName)
def getGridEngine( self ): def getGridEngine( self ):
return self.m_gridEngine return self.m_gridEngine

View File

@ -0,0 +1,97 @@
#Copyright Jon Berg , turtlemeat.com
import string,cgi,time
from os import curdir, sep
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import threading
import Util
#import pri
from urlparse import urlparse, parse_qs
#>>> url = 'http://example.com/?foo=bar&one=1'
#>>> parse_qs(urlparse(url).query)
#{'foo': ['bar'], 'one': ['1']}
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
try:
paramsDict=parse_qs(urlparse(self.path).query)
if self.path.endswith(".html"):
f = open(curdir + sep + self.path) #self.path has /test.html
#note that this potentially makes every file on your computer readable by the internet
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(f.read())
f.close()
return
if self.path.endswith(".esp"): #our dynamic content
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write("hey, today is the" + str(time.localtime()[7]))
self.wfile.write(" day in the year " + str(time.localtime()[0]))
return
if self.path.endswith("ShowControlledMachines"): #http://simpatix10.univ-rennes1.fr:8080/ShowControlledMachines
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write("hey, today is the" + str(time.localtime()[7]))
self.wfile.write(" day in the year " + str(time.localtime()[0]))
for machine in self.server.m_clusterController.m_clusterStatus.m_clusterNodes.itervalues():
self.wfile.write("%s is controlled by ClusterController" % machine.getName())
return
if urlparse(self.path).path == '/SetControlOnMachine': #http://simpatix10.univ-rennes1.fr:8080/SetControlOnMachine?machineName=simpatix30&control=1
machineName = paramsDict['machineName'][0]
bControl = (paramsDict['control'][0] == '1')
self.server.m_clusterController.setControlOnMachine(machineName, bControl)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
if bControl == True:
self.wfile.write("%s is now controlled by ClusterController" % machineName)
else:
self.wfile.write("%s is no longer controlled by ClusterController" % machineName)
return
return
except IOError:
self.send_error(404,'File Not Found: %s' % self.path)
def do_POST(self):
global rootnode
try:
ctype, pdict = cgi.parse_header(self.headers.getheader('content-type'))
if ctype == 'multipart/form-data':
query=cgi.parse_multipart(self.rfile, pdict)
self.send_response(301)
self.end_headers()
upfilecontent = query.get('upfile')
print "filecontent", upfilecontent[0]
self.wfile.write("<HTML>POST OK.<BR><BR>");
self.wfile.write(upfilecontent[0]);
except :
pass
class WebServerThread( threading.Thread ):
def __init__( self, clusterController ):
threading.Thread.__init__(self)
#self.m_clusterController = clusterController
self.m_bStop = False
self.m_httpServer = HTTPServer(('', 8080), MyHandler)
self.m_httpServer.m_clusterController = clusterController
def run( self ):
try:
while not self.m_bStop:
self.m_httpServer.handle_request()
#self.m_httpServer.serve_forever()
except BaseException, exception: # catches all exceptions, including the ctrl+C (KeyboardInterrupt)
self.m_httpServer.socket.close()
Util.onException(exception)