From: aguerre Date: Mon, 13 Jan 2014 12:49:46 +0000 (+0000) Subject: Bug fix on launcher for concurrent execution X-Git-Tag: V7_main_EDF_patch_concurrent_140113~3 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=653d82fd45a52d7f44b93beb65eb2825b1d1c8ee;p=modules%2Fkernel.git Bug fix on launcher for concurrent execution --- diff --git a/bin/PortManager.py b/bin/PortManager.py index 0ad853a9b..d820a8555 100644 --- a/bin/PortManager.py +++ b/bin/PortManager.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -*- coding: iso-8859-1 -*- -# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE +# Copyright (C) 2007-2014 CEA/DEN, EDF R&D, OPEN CASCADE # # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -59,176 +59,20 @@ if sys.platform == 'win32': multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable. -""" -This class handles port distribution for SALOME sessions. -In case of concurent sessions, each will have its own port number. -""" -class _PortManager(object): # :TODO: must manage lock owner - __metaclass__ = Singleton - # - def __init__(self, startNumber = 2810, limit = 100, timeout=60): - super(_PortManager, self).__init__() - self.__startNumber = startNumber - self.__limit = startNumber + limit - self.__lockedPorts = [] - self.__lock = multiprocessing.Lock() - self.__timeout = timeout - self.__lastChangeTime = time.time() - # - # Test for prefered port number, if asked. - def getPort(self, port=None): - with self.__lock: - if not port or self.isPortUsed(port): - port = self.__startNumber - while self.isPortUsed(port): - if port == self.__limit: - msg = "\n" - msg += "Can't find a free port to launch omniNames\n" - msg += "Try to kill the running servers and then launch SALOME again.\n" - raise RuntimeError, msg - port = port + 1 - # - self.__lockedPorts.append(port) - self.__lastChangeTime = time.time() - return port - # - def releasePort(self, port): - logger.debug("PortManager.releasePort %s"%port) - with self.__lock: - if port in self.__lockedPorts: - self.__lockedPorts.remove(port) - self.__lastChangeTime = time.time() - # - def isBusy(self): - return len(self.__lockedPorts) - # - def isPortUsed(self, port): - return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port) - # - def __isNetworkConnectionActiveOnPort(self, port): - # :NOTE: Under windows: - # netstat options -l and -t are unavailable - # grep command is unavailable - from subprocess import Popen, PIPE - (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate() - import StringIO - buf = StringIO.StringIO(stdout) - ports = buf.readlines() - # search for TCP - LISTEN connections - import re - regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE ); - for item in ports: - try: - p = int(regObj.match(item).group(1)) - if p == port: return True - except: - pass - # - def timeout(self): - return (time.time() - self.__lastChangeTime > self.__timeout) - # - def __str__(self): - with self.__lock: - return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts)) - # -# - -#------------------------------------ -# Communication methods - -_marshall = pickle.dumps -_unmarshall = pickle.loads - -def _send(channel, *args): - buf = _marshall(args) - value = socket.htonl(len(buf)) - size = struct.pack("L",value) - channel.send(size) - channel.send(buf) -# - -def _receive(channel): - size = struct.calcsize("L") - size = channel.recv(size) - try: - size = socket.ntohl(struct.unpack("L", size)[0]) - except struct.error, e: - return '' - - buf = "" - while len(buf) < size: - buf = channel.recv(size - len(buf)) - - return _unmarshall(buf)[0] -# -#------------------------------------ - -GET_PORT_MSG = "GET_PORT" -GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT" -RELEASE_PORT_MSG = "RELEASE_PORT" -STOP_SERVER_MSG = "STOP_SERVER" -TEST_SERVER_MSG = "TEST_SERVER" - -GET_PORT_ACK_MSG = "GET_PORT" -RELEASE_PORT_ACK_MSG = "RELEASE_PORT" -TEST_SERVER_ACK_MSG = "TEST_SERVER" - -class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): - def handle(self): - data = _receive(self.request) - if data == GET_PORT_MSG: - pm = _PortManager() - port = pm.getPort() - response = "%s: %s" % (GET_PORT_ACK_MSG, port) - _send(self.request, response) - elif data.startswith(GET_PREFERED_PORT_MSG): - port = int(data[len(GET_PREFERED_PORT_MSG)+1:]) - pm = _PortManager() - port = pm.getPort(port) - response = "%s: %s" % (GET_PORT_ACK_MSG, port) - _send(self.request, response) - elif data.startswith(RELEASE_PORT_MSG): - port = int(data[len(RELEASE_PORT_MSG)+1:]) - pm = _PortManager() - pm.releasePort(port) - response = "%s" % (RELEASE_PORT_ACK_MSG) - _send(self.request, response) - logger.debug("RELEASE_PORT: %s"%port) - if not pm.isBusy(): - logger.debug("Close server") - config_file, lock_file = _getConfigurationFilename() - try: - os.remove(config_file) - pmlock.release() - os.remove(lock_file) - except: - pass - self.server.shutdown() - #print pm - elif data == STOP_SERVER_MSG: - logger.debug("Close server") - self.server.shutdown() - elif data == TEST_SERVER_MSG: - _send(self.request, TEST_SERVER_ACK_MSG) -# - -class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - pass - #------------------------------------ # A file locker (Linux only) import fcntl class PortManagerLock: def __init__(self, filename, readonly=False, blocking=True): - self.filename = filename # This will create it if it does not exist already logger.debug("Create lock on %s"%filename) - mode = 'w' - if readonly: - mode = 'r' - self.handle = open(filename, mode) - self.handle.seek(0) # go back to beginning of file to read it multiple times + self.__readonly = readonly self.__blocking = blocking + self.__filename = filename + flag = 'w' + if self.__readonly: + flag = 'r' + self.handle = open(self.__filename, 'a+') def acquire(self): mode = fcntl.LOCK_EX @@ -242,34 +86,10 @@ class PortManagerLock: logger.debug("lock released") def __del__(self): - if logger: - logger.debug("Close lock file") + logger.debug("Close lock file") self.handle.close() + os.remove(self.__filename) # -#------------------------------------ - -# Server address has to be shared by different processes, without any common -# ancestor. -# The "simplest" solution is to define it here as a global variable. Unfortunately, -# availability of the corresponding socket is not guaranted at all. If PortManager -# tries to use a socket it does not own, server is not created (it is identified as -# already existing), clients then connect on this socket but message passing -# between clients and server will not work and SALOME launch will crash. -# We can introduce a port verification procedure automatically called by importing -# this module (i.e. when creating the server). This procedure consists in creating -# a client which sends a specific message to the server that has to be tested. And -# loop on port numbers until a free socket is found and associated to a new server. -# -# Another approach is to let Python socket API select a free port number, then store -# it to a file on server host machine in order to be shared with clients. -# The logical part can be defined as follows. When server is started (by importing -# this module), write server port number to a specific file (do not use a temporary -# file name). Each client then read server address from this same file ; if file is -# not nound, it is an error (add appropriate processing). -# Server could also check file existence and try to use the same address as previous -# server in order to avoid opening too many unecessary sockets ; but we need to apply -# the above verification procedure. This processing is not necessary because TCP socket -# timeout will automatically close unused sockets. def _getConfigurationFilename(): omniorbUserPath = os.getenv("OMNIORB_USER_PATH") @@ -284,114 +104,140 @@ def _getConfigurationFilename(): return (portmanager_config, lock_file) # -def __checkServer(): - while True: - logger.debug("CHECKING SERVER") - status = __newClient(TEST_SERVER_MSG) - if status == TEST_SERVER_ACK_MSG: - break - return (status == TEST_SERVER_ACK_MSG) +def __isPortUsed(port, busy_ports): + return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port) +# + +def __isNetworkConnectionActiveOnPort(port): + # :NOTE: Under windows: + # netstat options -l and -t are unavailable + # grep command is unavailable + from subprocess import Popen, PIPE + (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate() + import StringIO + buf = StringIO.StringIO(stdout) + ports = buf.readlines() + # search for TCP - LISTEN connections + import re + regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE ); + for item in ports: + try: + p = int(regObj.match(item).group(1)) + if p == port: return True + except: + pass # -def __getServerAddress(readonly=True): - address = ("localhost", 0) - try: - config_file, lock_file = _getConfigurationFilename() - lock = PortManagerLock(config_file, readonly, blocking=True) - lock.acquire() - address = eval(lock.handle.read()) - lock.release() - except (IOError, SyntaxError) as e: - logger.debug("no configuration file") - pass - finally: - return address -# +def getPort(preferedPort=None): + logger.debug("GET PORT") -def __setServerAddress(address): config_file, lock_file = _getConfigurationFilename() - lock = PortManagerLock(config_file, readonly=False, blocking=True) - lock.acquire() - logger.debug("setServerAddress: %s"%str(address)) - lock.handle.write(str(address)) - lock.release() -# + with open(lock_file, 'w') as lock: + # acquire lock + fcntl.flock(lock, fcntl.LOCK_EX) + + # read config + config = {'busy_ports':[]} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file + pass -def __getServer(): - address = __getServerAddress(readonly=False) - SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately - server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind - server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart - server.server_bind() # Manually bind, to support allow_reuse_address - __setServerAddress(server.server_address) - server.server_activate() - return server -# + logger.debug("load busy_ports: %s"%str(config["busy_ports"])) + + # append port + busy_ports = config["busy_ports"] + port = preferedPort + if not port or __isPortUsed(port, busy_ports): + port = 2810 + while __isPortUsed(port, busy_ports): + if port == 2810+100: + msg = "\n" + msg += "Can't find a free port to launch omniNames\n" + msg += "Try to kill the running servers and then launch SALOME again.\n" + raise RuntimeError, msg + port = port + 1 + logger.debug("found free port: %s"%str(port)) + config["busy_ports"].append(port) + + # write config + logger.debug("write busy_ports: %s"%str(config["busy_ports"])) + try: + with open(config_file, 'w') as f: + pickle.dump(config, f) + except IOError: + pass -pmlock = None -def __startServer(): - try: - config_file, lock_file = _getConfigurationFilename() - global pmlock - pmlock = PortManagerLock(lock_file, readonly=False, blocking=False) - pmlock.acquire() - - server = __getServer() - # Start a thread with the server -- that thread will then start one - # more thread for each request - server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager") - # Exit the server thread when the main thread terminates - #server_thread.setDaemon(True) - server_thread.start() - #print "Server loop running in thread:", server_thread.getName() - except: - logger.debug("Server already started") - pass + # release lock + fcntl.flock(lock, fcntl.LOCK_UN) + + logger.debug("get port: %s"%str(port)) + return port # -def __newClient(message): - address = __getServerAddress(readonly=True) - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - logger.debug("connect client to %s"%str(address)) - sock.connect(address) - _send(sock, message) - response = _receive(sock) - if response.startswith(GET_PORT_ACK_MSG): - port = int(response[len(GET_PORT_ACK_MSG)+1:]) - logger.debug("GET_PORT: %s"%port) - return port - elif response == RELEASE_PORT_ACK_MSG: - logger.debug("Received: %s" % response) - return 0 +def releasePort(port): + port = int(port) + logger.debug("RELEASE PORT (%s)"%port) + + config_file, lock_file = _getConfigurationFilename() + with open(lock_file, 'w') as lock: + # acquire lock + fcntl.flock(lock, fcntl.LOCK_EX) + + # read config + config = {'busy_ports':[]} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file pass - elif response == TEST_SERVER_ACK_MSG: - logger.debug("Server is ok") - return TEST_SERVER_ACK_MSG + + logger.debug("load busy_ports: %s"%str(config["busy_ports"])) + + # remove port from list + busy_ports = config["busy_ports"] + + if port in busy_ports: + busy_ports.remove(port) + config["busy_ports"] = busy_ports + + # write config + logger.debug("write busy_ports: %s"%str(config["busy_ports"])) + try: + with open(config_file, 'w') as f: + pickle.dump(config, f) + except IOError: pass - sock.close() - except socket.error: - logger.debug("Unable to connect to server") - return -1 -# -def getPort(preferedPort=None): - if preferedPort: - return __newClient("%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort)) - else: - return __newClient(GET_PORT_MSG) -# + # release lock + fcntl.flock(lock, fcntl.LOCK_UN) -def releasePort(port): - logger.debug("application asks for releasePort %s"%port) - __newClient("%s: %s"%(RELEASE_PORT_MSG,port)) + logger.debug("released port port: %s"%str(port)) # -def stopServer(): - __newClient(STOP_SERVER_MSG) -# +def getBusyPorts(): + config_file, lock_file = _getConfigurationFilename() + with open(lock_file, 'w') as lock: + # acquire lock + fcntl.flock(lock, fcntl.LOCK_EX) + + # read config + config = {'busy_ports':[]} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file + pass + + logger.debug("load busy_ports: %s"%str(config["busy_ports"])) -# Auto start: unique instance ; no effect if called multiple times -__startServer() -logger.debug("Server started... do check...") -assert(__checkServer()) + busy_ports = config["busy_ports"] + # release lock + fcntl.flock(lock, fcntl.LOCK_UN) + + return busy_ports +# diff --git a/bin/addToKillList.py b/bin/addToKillList.py index 2fbd02ec3..91068ee45 100755 --- a/bin/addToKillList.py +++ b/bin/addToKillList.py @@ -1,6 +1,6 @@ #! /usr/bin/env python # -*- coding: iso-8859-1 -*- -# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE +# Copyright (C) 2007-2014 CEA/DEN, EDF R&D, OPEN CASCADE # # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -36,7 +36,7 @@ def findFileDict(): port = getPortNumber() if verbose(): print "myport = ", port return port - + def addToKillList(command_pid, command, port=None): """ Add the process to the SALOME processes dictionary file. @@ -49,11 +49,15 @@ def addToKillList(command_pid, command, port=None): # retrieve current processes dictionary from killSalomeWithPort import getPiDict if port is None: port=findFileDict() - filedict=getPiDict(port) try: - fpid=open(filedict, 'r') - process_ids=pickle.load(fpid) - fpid.close() + import PortManager + filedict = getPiDict(port, hidden=True, with2809pid=True) + except: + filedict=getPiDict(port) + + try: + with open(filedict, 'r') as fpid: + process_ids=pickle.load(fpid) except: process_ids=[] pass @@ -77,9 +81,8 @@ def addToKillList(command_pid, command, port=None): process_ids.append({int(command_pid): [command]}) dir = os.path.dirname(filedict) if not os.path.exists(dir): os.makedirs(dir, 0777) - fpid = open(filedict,'w') - pickle.dump(process_ids, fpid) - fpid.close() + with open(filedict,'w') as fpid: + pickle.dump(process_ids, fpid) except: if verbose(): print "addToKillList: can not add command %s : %s to the kill list" % ( str(command_pid), command ) pass @@ -100,9 +103,8 @@ def killList(port=None): # provide compatibility with old-style pidict file (not dot-prefixed) if not os.path.exists(filedict): filedict = getPiDict(port, hidden=False) try: - fpid=open(filedict, 'r') - process_ids=pickle.load(fpid) - fpid.close() + with open(filedict, 'r') as fpid: + process_ids=pickle.load(fpid) except: process_ids=[] pass @@ -110,7 +112,6 @@ def killList(port=None): for process_id in process_ids: #print process_id for pid, cmd in process_id.items(): - print "stop process %s : %s"% (pid, cmd[0]) try: os.kill(int(pid),signal.SIGKILL) except: diff --git a/bin/killSalomeWithPort.py b/bin/killSalomeWithPort.py index 23b70628f..1d8b590b7 100755 --- a/bin/killSalomeWithPort.py +++ b/bin/killSalomeWithPort.py @@ -1,6 +1,6 @@ #! /usr/bin/env python # -*- coding: iso-8859-1 -*- -# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE +# Copyright (C) 2007-2014 CEA/DEN, EDF R&D, OPEN CASCADE # # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -35,7 +35,7 @@ import os, sys, pickle, signal, commands,glob from salome_utils import verbose import salome_utils -def getPiDict(port,appname='salome',full=True,hidden=True,hostname=None): +def getPiDict(port,appname='salome',full=True,hidden=True,hostname=None, with2809pid=False): """ Get file with list of SALOME processes. This file is located in the user's home directory @@ -70,8 +70,12 @@ def getPiDict(port,appname='salome',full=True,hidden=True,hostname=None): dir = os.getenv("HOME") pass pass + + suffix = "pidict" + if port == 2809 and with2809pid: + suffix = suffix + "-%s"%(os.getpid()) return generateFileName(dir, - suffix="pidict", + suffix=suffix, hidden=hidden, with_username=True, with_hostname=hostname or True, @@ -113,9 +117,9 @@ def appliCleanOmniOrbConfig(port): pass pass else: - os.remove(last_running_config) + os.remove(last_running_config) pass - pass + pass if os.access(omniorb_config,os.F_OK): os.remove(omniorb_config) @@ -199,102 +203,71 @@ def shutdownMyPort(port, cleanup=True): pass pass -def killMyPort(port): - """ - Kill SALOME session running on the specified port. - Parameters: - - port - port number - """ +def __killMyPort(port, filedict): try: - import PortManager - PortManager.releasePort(port) - except ImportError: - pass - - from salome_utils import getShortHostName, getHostName - - # try to shutdown session nomally - import threading, time - threading.Thread(target=shutdownMyPort, args=(port,False)).start() - time.sleep(3) # wait a little, then kill processes (should be done if shutdown procedure hangs up) - - # new-style dot-prefixed pidict file - filedict = getPiDict(port, hidden=True) - # provide compatibility with old-style pidict file (not dot-prefixed) - if not os.path.exists(filedict): filedict = getPiDict(port, hidden=False) - # provide compatibility with old-style pidict file (short hostname) - if not os.path.exists(filedict): filedict = getPiDict(port, hidden=True, hostname=getShortHostName()) - # provide compatibility with old-style pidict file (not dot-prefixed, short hostname) - if not os.path.exists(filedict): filedict = getPiDict(port, hidden=False, hostname=getShortHostName()) - # provide compatibility with old-style pidict file (long hostname) - if not os.path.exists(filedict): filedict = getPiDict(port, hidden=True, hostname=getHostName()) - # provide compatibility with old-style pidict file (not dot-prefixed, long hostname) - if not os.path.exists(filedict): filedict = getPiDict(port, hidden=False, hostname=getHostName()) - # - try: - fpid = open(filedict, 'r') - # - from salome_utils import generateFileName - if sys.platform == "win32": - username = os.getenv( "USERNAME" ) - else: - username = os.getenv('USER') - path = os.path.join('/tmp/logs', username) - fpidomniNames = generateFileName(path, - prefix="", - suffix="Pid_omniNames", - extension="log", - with_port=port) - if not sys.platform == 'win32': - cmd = 'pid=`ps -eo pid,command | egrep "[0-9] omniNames -start %s"` ; echo $pid > %s' % ( str(port), fpidomniNames ) - a = os.system(cmd) - pass - try: - fpidomniNamesFile = open(fpidomniNames) - lines = fpidomniNamesFile.readlines() - fpidomniNamesFile.close() - os.remove(fpidomniNames) - for l in lines: - try: - pidfield = l.split()[0] # pid should be at the first position - if sys.platform == "win32": - import win32pm - if verbose(): print 'stop process '+pidfield+' : omniNames' - win32pm.killpid(int(pidfield),0) - else: - if verbose(): print 'stop process '+pidfield+' : omniNames' - os.kill(int(pidfield),signal.SIGKILL) - pass - pass - except: - pass + with open(filedict, 'r') as fpid: + # + from salome_utils import generateFileName + if sys.platform == "win32": + username = os.getenv( "USERNAME" ) + else: + username = os.getenv('USER') + path = os.path.join('/tmp/logs', username) + fpidomniNames = generateFileName(path, + prefix="", + suffix="Pid_omniNames", + extension="log", + with_port=port) + if not sys.platform == 'win32': + cmd = 'pid=`ps -eo pid,command | egrep "[0-9] omniNames -start %s"` ; echo $pid > %s' % ( str(port), fpidomniNames ) + a = os.system(cmd) pass - pass - except: - pass - # - try: - process_ids=pickle.load(fpid) - fpid.close() - for process_id in process_ids: - for pid, cmd in process_id.items(): - if verbose(): print "stop process %s : %s"% (pid, cmd[0]) + try: + with open(fpidomniNames) as fpidomniNamesFile: + lines = fpidomniNamesFile.readlines() + + os.remove(fpidomniNames) + for l in lines: try: + pidfield = l.split()[0] # pid should be at the first position if sys.platform == "win32": import win32pm - win32pm.killpid(int(pid),0) + if verbose(): print 'stop process '+pidfield+' : omniNames' + win32pm.killpid(int(pidfield),0) else: - os.kill(int(pid),signal.SIGKILL) + if verbose(): print 'stop process '+pidfield+' : omniNames' + os.kill(int(pidfield),signal.SIGKILL) pass pass except: - if verbose(): print " ------------------ process %s : %s not found"% (pid, cmd[0]) pass - pass # for pid, cmd ... - pass # for process_id ... - pass # try... - except: - pass + pass + pass + except: + pass + # + try: + process_ids=pickle.load(fpid) + for process_id in process_ids: + for pid, cmd in process_id.items(): + if verbose(): print "stop process %s : %s"% (pid, cmd[0]) + try: + if sys.platform == "win32": + import win32pm + win32pm.killpid(int(pid),0) + else: + os.kill(int(pid),signal.SIGKILL) + pass + pass + except: + if verbose(): print " ------------------ process %s : %s not found"% (pid, cmd[0]) + pass + pass # for pid, cmd ... + pass # for process_id ... + pass # try... + except: + pass + # end with # os.remove(filedict) cmd='ps -eo pid,command | egrep "[0-9] omniNames -start '+str(port)+'" | sed -e "s%[^0-9]*\([0-9]*\) .*%\\1%g"' @@ -303,13 +276,50 @@ def killMyPort(port): while pid and len(a.split()) < 2: a = commands.getoutput("kill -9 " + pid) pid = commands.getoutput(cmd) - #print pid pass pass except: print "Cannot find or open SALOME PIDs file for port", port pass # +# + +def killMyPort(port): + """ + Kill SALOME session running on the specified port. + Parameters: + - port - port number + """ + from salome_utils import getShortHostName, getHostName + print "Terminating SALOME on port %d..."%(port) + + # try to shutdown session normally + import threading, time + threading.Thread(target=shutdownMyPort, args=(port,False)).start() + time.sleep(3) # wait a little, then kill processes (should be done if shutdown procedure hangs up) + + try: + import PortManager + filedict = getPiDict(port, hidden=True, with2809pid=False) + import glob + all_files = glob.glob("%s*"%filedict) + for f in all_files: + __killMyPort(port, f) + except: + # new-style dot-prefixed pidict file + filedict = getPiDict(port, hidden=True) + # provide compatibility with old-style pidict file (not dot-prefixed) + if not os.path.exists(filedict): filedict = getPiDict(port, hidden=False) + # provide compatibility with old-style pidict file (short hostname) + if not os.path.exists(filedict): filedict = getPiDict(port, hidden=True, hostname=getShortHostName()) + # provide compatibility with old-style pidict file (not dot-prefixed, short hostname) + if not os.path.exists(filedict): filedict = getPiDict(port, hidden=False, hostname=getShortHostName()) + # provide compatibility with old-style pidict file (long hostname) + if not os.path.exists(filedict): filedict = getPiDict(port, hidden=True, hostname=getHostName()) + # provide compatibility with old-style pidict file (not dot-prefixed, long hostname) + if not os.path.exists(filedict): filedict = getPiDict(port, hidden=False, hostname=getHostName()) + __killMyPort(port, filedict) + # appliCleanOmniOrbConfig(port) pass @@ -320,15 +330,15 @@ def killNotifdAndClean(port): - port - port number """ try: - filedict=getPiDict(port) - f=open(filedict, 'r') - pids=pickle.load(f) - for d in pids: - for pid,process in d.items(): - if 'notifd' in process: - cmd='kill -9 %d'% pid - os.system(cmd) - os.remove(filedict) + filedict=getPiDict(port) + with open(filedict, 'r') as f: + pids=pickle.load(f) + for d in pids: + for pid,process in d.items(): + if 'notifd' in process: + cmd='kill -9 %d'% pid + os.system(cmd) + os.remove(filedict) except: #import traceback #traceback.print_exc() diff --git a/bin/runSalome.py b/bin/runSalome.py index 4b3a42530..87539c111 100755 --- a/bin/runSalome.py +++ b/bin/runSalome.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -*- coding: iso-8859-1 -*- -# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE +# Copyright (C) 2007-2014 CEA/DEN, EDF R&D, OPEN CASCADE # # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -491,15 +491,6 @@ def startSalome(args, modules_list, modules_root_dir): args["session_object"] = session return clt - # Save Naming service port name into - # the file args["ns_port_log_file"] - if args.has_key('ns_port_log_file'): - omniorbUserPath = os.getenv("OMNIORB_USER_PATH") - file_name = os.path.join(omniorbUserPath, args["ns_port_log_file"]) - f = open(file_name, "w") - f.write(os.environ['NSPORT']) - f.close() - # Launch Logger Server (optional) # and wait until it is registered in naming service # diff --git a/bin/salomeRunner.py b/bin/salomeRunner.py index f8b8deb40..b96322063 100644 --- a/bin/salomeRunner.py +++ b/bin/salomeRunner.py @@ -86,7 +86,7 @@ class SalomeRunner: # according to current path (initialized from environment files). absoluteAppliPath = os.getenv('ABSOLUTE_APPLI_PATH','') proc = subprocess.Popen(['python', os.path.join(absoluteAppliPath,"bin","salome","salomeRunner.py"), pickle.dumps(self), pickle.dumps(args)], shell=False, close_fds=True) - proc.wait() + proc.communicate() # """Append value to PATH environment variable""" @@ -244,7 +244,8 @@ class SalomeRunner: scriptArgs = getScriptsAndArgs(args) command = formatScriptsAndArgs(scriptArgs) if command: - proc = subprocess.Popen(command, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + command = command.split(' ') + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return proc.communicate() else: absoluteAppliPath = os.getenv('ABSOLUTE_APPLI_PATH','') @@ -264,8 +265,27 @@ class SalomeRunner: # def _killAll(self, args=[]): - from killSalome import killAllPorts - killAllPorts() + absoluteAppliPath = os.getenv('ABSOLUTE_APPLI_PATH','') + try: + import PortManager + ports = PortManager.getBusyPorts() + + from multiprocessing import Process + from killSalomeWithPort import killMyPort + if ports: + import tempfile + for port in ports: + with tempfile.NamedTemporaryFile(): + p = Process(target = killMyPort, args=(port,)) + p.start() + p.join() + except ImportError: + pass + + p = Process(target = killMyPort, args=(2809,)) + p.start() + p.join() + # def _showInfo(self, args=[]): diff --git a/bin/searchFreePort.py b/bin/searchFreePort.py index 83e528734..0bcc470b2 100644 --- a/bin/searchFreePort.py +++ b/bin/searchFreePort.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -*- coding: iso-8859-1 -*- -# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE +# Copyright (C) 2007-2014 CEA/DEN, EDF R&D, OPEN CASCADE # # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -123,7 +123,7 @@ def searchFreePort_withoutPortManager(args={}, save_config=1, use_port=None): # # -def searchFreePort_withPortManager(args={}, save_config=1, use_port=None): +def searchFreePort_withPortManager(queue, args={}, save_config=1, use_port=None): from PortManager import getPort port = getPort(use_port) @@ -146,6 +146,10 @@ def searchFreePort_withPortManager(args={}, save_config=1, use_port=None): __setup_config(port, args, save_config) else: print "Unable to obtain port" + + queue.put([os.environ['OMNIORB_CONFIG'], + os.environ['NSPORT'], + os.environ['NSHOST']]) # def searchFreePort(args={}, save_config=1, use_port=None): @@ -154,8 +158,26 @@ def searchFreePort(args={}, save_config=1, use_port=None): Returns first found free port number. """ try: - import PortManager - searchFreePort_withPortManager(args, save_config, use_port) + from multiprocessing import Process, Queue + queue = Queue() + p = Process(target = searchFreePort_withPortManager, args=(queue, args, save_config, use_port,)) + p.start() + info = queue.get() + + os.environ['OMNIORB_CONFIG'] = info[0] + os.environ['NSPORT'] = info[1] + os.environ['NSHOST'] = info[2] + + # Save Naming service port name into + # the file args["ns_port_log_file"] + if args.has_key('ns_port_log_file'): + omniorbUserPath = os.getenv("OMNIORB_USER_PATH") + file_name = os.path.join(omniorbUserPath, args["ns_port_log_file"]) + f = open(file_name, "w") + f.write(os.environ['NSPORT']) + f.close() + + p.join() # this blocks until the process terminates except ImportError: searchFreePort_withoutPortManager(args, save_config, use_port) #