# Copyright (C) 2013-2024 CEA, EDF, OPEN CASCADE # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com # import os import sys import glob import subprocess import re import socket import json """ Define a specific exception class to manage exceptions related to SalomeContext """ class SalomeContextException(Exception): """Report error messages to the user interface of SalomeContext.""" # def __listDirectory(path): allFiles = [] for root, dirs, files in os.walk(path): cfgFiles = glob.glob(os.path.join(root,'*.cfg')) allFiles += cfgFiles return allFiles # def __getConfigFileNamesDefault(): absoluteAppliPath = os.getenv('ABSOLUTE_APPLI_PATH','') if not absoluteAppliPath: return [] envdDir = absoluteAppliPath + '/env.d' if not os.path.isdir(envdDir): return [] return __listDirectory(envdDir) # def __getEnvironmentFileNames(args, optionPrefix, checkExistence): # special case: extra configuration/environment files are provided by user # Search for command-line argument(s) =file1,file2,..., filen # Search for command-line argument(s) =dir1,dir2,..., dirn configArgs = [ str(x) for x in args if str(x).startswith(optionPrefix) ] args = [ x for x in args if not x.startswith(optionPrefix) ] allLists = [ x.replace(optionPrefix, '') for x in configArgs ] configFileNames = [] unexisting = [] for currentList in allLists: elements = currentList.split(',') for elt in elements: elt = os.path.realpath(os.path.expanduser(elt)) if os.path.isdir(elt): configFileNames += __listDirectory(elt) else: if checkExistence and not os.path.isfile(elt): unexisting += [elt] else: configFileNames += [elt] return configFileNames, args, unexisting # def getConfigFileNames(args, checkExistence=False): configOptionPrefix = "--config=" configArgs = [ str(x) for x in args if str(x).startswith(configOptionPrefix) ] if len(configArgs) == 0: configFileNames, unexist = __getConfigFileNamesDefault(), [] else: # get configuration filenames configFileNames, args, unexist = __getEnvironmentFileNames(args, configOptionPrefix, checkExistence) return configFileNames, args, unexist # def __getScriptPath(scriptName, searchPathList): scriptName = os.path.expanduser(scriptName) if os.path.isabs(scriptName): return scriptName if searchPathList is None or len(searchPathList) == 0: return None for path in searchPathList: fullName = os.path.join(path, scriptName) if os.path.isfile(fullName) or os.path.isfile(fullName+".py"): return fullName return None # class ScriptAndArgs: # script: the command to be run, e.g. python # args: its input parameters # out: its output parameters def __init__(self, script=None, args=None, out=None): self.script = script self.args = args self.out = out # def __repr__(self): msg = "\n# Script: %s\n"%self.script msg += " * Input: %s\n"%self.args msg += " * Output: %s\n"%self.out return msg # # class ScriptAndArgsObjectEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, ScriptAndArgs): # to be easily parsed in GUI module (SalomeApp_Application) # Do not export output arguments return {obj.script:obj.args or []} else: return json.JSONEncoder.default(self, obj) # def getShortAndExtraArgs(args=None): if args is None: args = [] try: pos = args.index("--") # raise a ValueError if not found short_args = args[:pos] extra_args = args[pos:] # include "--" except ValueError: short_args = args extra_args = [] pass return short_args, extra_args # # Return an array of ScriptAndArgs objects def getScriptsAndArgs(args=None, searchPathList=None): if args is None: args = [] short_args, extra_args = getShortAndExtraArgs(args) args = short_args if searchPathList is None: searchPathList = sys.path # Syntax of args: script.py [args:a1,a2=val,an] ... script.py [args:a1,a2=val,an] scriptArgs = [] currentKey = None argsPrefix = "args:" outPrefix = "out:" callPython = False afterArgs = False currentScript = None for i in range(len(args)): elt = os.path.expanduser(args[i]) isDriver = (elt == "driver") # special case for YACS scheme execution if elt.startswith(argsPrefix): if not currentKey or callPython: raise SalomeContextException("args list must follow corresponding script file in command line.") elt = elt.replace(argsPrefix, '') # Special process if some items of 'args:' list are themselves lists # Note that an item can be a list, but not a list of lists... # So we can have something like this: # myscript.py args:['file1','file2'],val1,"done",[1,2,3],[True,False],"ok" # With such a call, an elt variable contains the string representing ['file1','file2'],val1,"done",[1,2,3],[True,False],"ok" that is '[file1,file2],val1,done,[1,2,3],[True,False],ok' # We have to split elt to obtain: ['[file1,file2]','val1','done','[1,2,3]','[True,False]','ok'] contains_list = re.findall('(\[[^\]]*\])', elt) if contains_list: extracted_args = [] x = elt.split(",") # x is ['[file1', 'file2]', 'val1', 'done', '[1', '2', '3]', '[True', 'False]', 'ok'] list_begin_indices = [i for i in range(len(x)) if x[i].startswith('[')] # [0, 4, 7] list_end_indices = [i for i in range(len(x)) if x[i].endswith(']')] # [1, 6, 8] start = 0 for lbeg, lend in zip(list_begin_indices,list_end_indices): # [(0, 1), (4, 6), (7, 8)] if lbeg > start: extracted_args += x[start:lbeg] pass extracted_args += [','.join(x[lbeg:lend+1])] start = lend+1 pass if start < len(x): extracted_args += x[start:len(x)] pass scriptArgs[len(scriptArgs)-1].args = extracted_args pass else: # a single split is enough scriptArgs[len(scriptArgs)-1].args = [os.path.expanduser(x) for x in elt.split(",")] pass currentKey = None callPython = False afterArgs = True elif elt.startswith(outPrefix): if (not currentKey and not afterArgs) or callPython: raise SalomeContextException("out list must follow both corresponding script file and its args in command line.") elt = elt.replace(outPrefix, '') scriptArgs[len(scriptArgs)-1].out = [os.path.expanduser(x) for x in elt.split(",")] currentKey = None callPython = False afterArgs = False elif elt.startswith("python"): callPython = True afterArgs = False else: file_extension = os.path.splitext(elt)[-1] if not os.path.isfile(elt) and not os.path.isfile(elt+".py"): eltInSearchPath = __getScriptPath(elt, searchPathList) if eltInSearchPath is None or (not os.path.isfile(eltInSearchPath) and not os.path.isfile(eltInSearchPath+".py")): if file_extension == ".py": raise SalomeContextException("Script not found: %s"%elt) scriptArgs.append(ScriptAndArgs(script=elt)) continue elt = eltInSearchPath if file_extension != ".hdf": if file_extension == ".py" or isDriver: currentScript = os.path.abspath(elt) elif os.path.isfile(elt+".py"): currentScript = os.path.abspath(elt+".py") else: currentScript = os.path.abspath(elt) # python script not necessary has .py extension pass if currentScript and callPython: currentKey = "@PYTHONBIN@ "+currentScript scriptArgs.append(ScriptAndArgs(script=currentKey)) callPython = False elif currentScript: script_extension = os.path.splitext(currentScript)[-1] if isDriver: currentKey = currentScript scriptArgs.append(ScriptAndArgs(script=currentKey)) callPython = False elif not os.access(currentScript, os.X_OK): currentKey = "@PYTHONBIN@ "+currentScript scriptArgs.append(ScriptAndArgs(script=currentKey)) else: ispython = False try: fn = open(currentScript) for i in range(10): # read only 10 first lines ln = fn.readline() if re.search("#!.*python"): ispython = True break pass except Exception: pass finally: fn.close() if not ispython and script_extension == ".py": currentKey = "@PYTHONBIN@ "+currentScript else: currentKey = currentScript pass scriptArgs.append(ScriptAndArgs(script=currentKey)) # CLOSE elif currentScript afterArgs = False # end for loop if len(extra_args) > 1: # syntax: -- program [options] [arguments] command = extra_args[1] command_args = extra_args[2:] scriptArgs.append(ScriptAndArgs(script=command, args=command_args)) pass return scriptArgs # # Formatting scripts and args as a Bash-like command-line: # script1.py [args] ; script2.py [args] ; ... # scriptArgs is a list of ScriptAndArgs objects; their output parameters are omitted def formatScriptsAndArgs(scriptArgs=None, escapeSpaces=False): if scriptArgs is None: return "" commands = [] for sa_obj in scriptArgs: cmd = sa_obj.script if escapeSpaces and cmd: if sys.platform == "win32": cmd = cmd.replace(' ', ' "', 1) cmd = cmd + '"' else: cmd = cmd.replace(' ', '\ ').replace('\ ', ' ', 1) if sa_obj.args: cmd = " ".join([cmd]+sa_obj.args) commands.append(cmd) sep = " ; " if sys.platform == "win32": sep = " & " command = sep.join(["%s"%x for x in commands]) return command # # Ensure OMNIORB_USER_PATH is defined. This variable refers to a folder in which # SALOME will write omniOrb configuration files. # If OMNIORB_USER_PATH is already set, only checks write access to associated directory ; # an exception is raised if check fails. It allows users for choosing a specific folder. # Else the function sets OMNIORB_USER_PATH this way: # - If APPLI environment variable is set, and if ${APPLI}/USERS points at an existing # folder with write access, then OMNIORB_USER_PATH is set to ${APPLI}/USERS. # This is the case if SALOME virtual application has been created using # appli_gen.py script. # - Else OMNIORB_USER_PATH is set to user home directory. def setOmniOrbUserPath(): omniorbUserPath = os.getenv("OMNIORB_USER_PATH") if omniorbUserPath: if not os.access(omniorbUserPath, os.W_OK): raise Exception("Unable to get write access to directory: %s"%omniorbUserPath) pass else: # Must be in /tmp (or equivalent) to handle application concurrency try: import tempfile temp = tempfile.NamedTemporaryFile() temp_dir = os.path.dirname(temp.name) temp.close() if not os.access(temp_dir, os.W_OK): raise Exception("Unable to get write access to directory: %s"%temp_dir) os.environ["OMNIORB_USER_PATH"] = temp_dir except Exception: homePath = os.path.realpath(os.path.expanduser('~')) #defaultOmniorbUserPath = os.path.join(homePath, ".salomeConfig/USERS") defaultOmniorbUserPath = homePath if os.getenv("APPLI"): appli_users_path=os.path.join(homePath, os.getenv("APPLI"), "USERS") if os.access(appli_users_path, os.W_OK): defaultOmniorbUserPath = appli_users_path pass os.environ["OMNIORB_USER_PATH"] = defaultOmniorbUserPath # def getHostname(): return socket.gethostname().split('.')[0] #