1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
36 from SALOME_ContainerHelper import ScriptExecInfo
38 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
40 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
42 class Generic(SALOME__POA.GenericObj):
43 """A Python implementation of the GenericObj CORBA IDL"""
44 def __init__(self,poa):
49 #print("Register called : %d"%self.cnt)
53 #print("UnRegister called : %d"%self.cnt)
56 oid=self.poa.servant_to_id(self)
57 self.poa.deactivate_object(oid)
60 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
64 #print("Destuctor called")
67 class PyNode_i (Engines__POA.PyNode,Generic):
68 """The implementation of the PyNode CORBA IDL"""
69 def __init__(self, nodeName,code,poa,my_container):
70 """Initialize the node : compilation in the local context"""
71 Generic.__init__(self,poa)
72 self.nodeName=nodeName
74 self.my_container=my_container._container
75 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
76 ccode=compile(code,nodeName,'exec')
78 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
79 exec(ccode, self.context)
81 def getContainer(self):
82 return self.my_container
90 def defineNewCustomVar(self,varName,valueOfVar):
91 self.context[varName] = pickle.loads(valueOfVar)
94 def executeAnotherPieceOfCode(self,code):
95 """Called for initialization of container lodging self."""
97 ccode=compile(code,self.nodeName,'exec')
98 exec(ccode, self.context)
100 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
102 def execute(self,funcName,argsin):
103 """Execute the function funcName found in local context with pickled args (argsin)"""
105 argsin,kws=pickle.loads(argsin)
106 func=self.context[funcName]
107 argsout=func(*argsin,**kws)
108 argsout=pickle.dumps(argsout,-1)
111 exc_typ,exc_val,exc_fr=sys.exc_info()
112 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
113 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
115 class SenderByte_i(SALOME__POA.SenderByte,Generic):
116 def __init__(self,poa,bytesToSend):
117 Generic.__init__(self,poa)
118 self.bytesToSend = bytesToSend
121 return len(self.bytesToSend)
123 def sendPart(self,n1,n2):
124 return self.bytesToSend[n1:n2]
126 SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR"
128 SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES"
131 SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000
133 DicoForProxyFile = { }
135 def GetSizeOfBufferedReader(f):
137 This method returns in bytes size of a file openned.
141 f (io.IOBase): buffered reader returned by open
149 f.seek(0,io.SEEK_END)
151 f.seek(pos,io.SEEK_SET)
154 def GetObjectFromFile(fname, visitor = None):
155 with open(fname,"rb") as f:
157 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
158 visitor.setFileName( fname )
162 def DumpInFile(obj,fname):
163 with open(fname,"wb") as f:
166 def IncrRefInFile(fname):
167 if fname in DicoForProxyFile:
168 DicoForProxyFile[fname] += 1
170 DicoForProxyFile[fname] = 2
173 def DecrRefInFile(fname):
174 if fname not in DicoForProxyFile:
177 cnt = DicoForProxyFile[fname]
178 DicoForProxyFile[fname] -= 1
180 del DicoForProxyFile[fname]
182 if os.path.exists(fname):
186 def GetBigObjectOnDiskThreshold():
188 if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ:
189 return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] )
191 return SALOME_BIG_OBJ_ON_DISK_THRES_DFT
193 def ActivateProxyMecanismOrNot( sizeInByte ):
194 thres = GetBigObjectOnDiskThreshold()
198 return sizeInByte > thres
200 def GetBigObjectDirectory():
202 if SALOME_FILE_BIG_OBJ_DIR not in os.environ:
203 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
204 return os.path.expanduser( os.path.expandvars( os.environ[SALOME_FILE_BIG_OBJ_DIR] ) )
206 def GetBigObjectFileName():
208 Return a filename in the most secure manner (see tempfile documentation)
211 with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
215 class BigObjectOnDiskBase:
216 def __init__(self, fileName, objSerialized):
218 :param fileName: the file used to dump into.
219 :param objSerialized: the object in pickeled form
220 :type objSerialized: bytes
222 self._filename = fileName
223 # attribute _destroy is here to tell client side or server side
224 # only client side can be with _destroy set to True. server side due to risk of concurrency
225 # so pickled form of self must be done with this attribute set to False.
226 self._destroy = False
227 self.__dumpIntoFile(objSerialized)
229 def getDestroyStatus(self):
234 IncrRefInFile( self._filename )
236 # should never happen !
237 RuntimeError("Invalid call to incrRef !")
241 DecrRefInFile( self._filename )
243 # should never happen !
244 RuntimeError("Invalid call to decrRef !")
246 def unlinkOnDestructor(self):
249 def doNotTouchFile(self):
251 Method called slave side. The life cycle management of file is client side not slave side.
253 self._destroy = False
257 DecrRefInFile( self._filename )
259 def getFileName(self):
260 return self._filename
262 def __dumpIntoFile(self, objSerialized):
263 DumpInFile( objSerialized, self._filename )
265 def get(self, visitor = None):
266 obj = GetObjectFromFile( self._filename, visitor )
270 return float( self.get() )
273 return int( self.get() )
277 if isinstance(obj,str):
280 raise RuntimeError("Not a string")
282 class BigObjectOnDisk(BigObjectOnDiskBase):
283 def __init__(self, fileName, objSerialized):
284 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
286 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
287 def __init__(self, pos, length, fileName):
288 self._filename = fileName
289 self._destroy = False
291 self._length = length
293 def get(self, visitor = None):
294 fullObj = BigObjectOnDiskBase.get(self, visitor)
295 return fullObj[ self._pos ]
297 def __getitem__(self, i):
301 return len(self.get())
303 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
304 def __init__(self, length, fileName, objSerialized):
305 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
306 self._length = length
308 def __getitem__(self, i):
309 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
314 class BigObjectOnDiskList(BigObjectOnDiskSequence):
315 def __init__(self, length, fileName, objSerialized):
316 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
318 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
319 def __init__(self, length, fileName, objSerialized):
320 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
322 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
324 This method return a proxy instance of pickled form of object given in input.
328 obj (pickelable type) : object to be proxified
329 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
333 BigObjectOnDiskBase: proxy instance
335 pickleObj = pickleObjInit
336 if pickleObj is None:
337 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
338 fileName = GetBigObjectFileName()
340 visitor.setHDDMem( len(pickleObj) )
341 visitor.setFileName(fileName)
342 if isinstance( obj, list):
343 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
344 elif isinstance( obj, tuple):
345 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
347 proxyObj = BigObjectOnDisk( fileName , pickleObj )
350 def SpoolPickleObject( obj, visitor = None ):
352 with InOutputObjVisitorCM(visitor) as v:
353 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
354 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
357 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
358 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
361 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
363 def UnProxyObjectSimple( obj, visitor = None ):
365 Method to be called in Remote mode. Alterate the obj _status attribute.
366 Because the slave process does not participate in the reference counting
370 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
373 with InOutputObjVisitorCM(visitor) as v:
374 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
375 if isinstance(obj,BigObjectOnDiskBase):
378 elif isinstance( obj, list):
381 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
386 def UnProxyObjectSimpleLocal( obj ):
388 Method to be called in Local mode. Do not alterate the PyObj counter
390 if isinstance(obj,BigObjectOnDiskBase):
392 elif isinstance( obj, list):
395 retObj.append( UnProxyObjectSimpleLocal(elt) )
401 def __init__(self, fileName):
402 self._filename = fileName
405 return self._filename
407 class FileDeleter(FileHolder):
408 def __init__(self, fileName):
409 super().__init__( fileName )
412 if os.path.exists( self._filename ):
413 os.unlink( self._filename )
415 class MonitoringInfo:
416 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
417 self._py_file_name = pyFileName
418 self._interval_in_ms = intervalInMs
419 self._out_file_name = outFileName
423 def pyFileName(self):
424 return self._py_file_name
431 def pid(self, value):
435 def outFileName(self):
436 return self._out_file_name
439 def intervalInMs(self):
440 return self._interval_in_ms
442 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
444 This method loops indefinitely every intervalInMs milliseconds to scan
445 number of inodes and size of content recursively included into the in input directory.
450 outFileName (str) : name of file inside the results will be written. If None a new file is generated
452 See also CPUMemoryMonitoring
456 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
460 # outFileNameSave stores the content of outFileName during phase of dumping
461 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
462 outFileNameSave = f.name
463 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
465 tempOutFile = outFileName
466 if tempOutFile is None:
467 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
468 with open(tempPyFile,"w") as f:
470 import subprocess as sp
475 with open("{tempOutFile}","a") as f:
476 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
477 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
481 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
486 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
487 szOfDirStr = re.split("[\s]+",st)[0]
490 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
491 f.write( "{{}}\\n".format( str( nbinodes ) ) )
492 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
494 time.sleep( {intervalInMs} / 1000.0 )
495 """.format( **locals()))
496 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
497 pyFileName = FileDeleter( tempPyFile )
498 if outFileName is None:
499 outFileName = FileDeleter( tempOutFile )
501 outFileName = FileHolder(outFileName)
502 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
504 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
506 Launch a subprocess monitoring self process.
507 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
508 CPU usage and RSS memory of the calling process.
509 Communication between subprocess and self is done by file.
513 outFileName (str) : name of file inside the results will be written. If None a new file is generated
515 See also FileSystemMonitoring
518 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
521 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
523 tempOutFile = outFileName
524 if tempOutFile is None:
525 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
527 with open(tempPyFile,"w") as f:
528 f.write("""import psutil
530 process = psutil.Process( pid )
532 with open("{}","a") as f:
533 f.write( "{{}}\\n".format( "{}" ) )
535 f.write( "{{}}\\n".format( str( process.cpu_percent() ) ) )
536 f.write( "{{}}\\n".format( str( process.memory_info().rss ) ) )
538 time.sleep( {} / 1000.0 )
539 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
540 if outFileName is None:
541 autoOutFile = FileDeleter(tempOutFile)
543 autoOutFile = FileHolder(tempOutFile)
544 return FileDeleter(tempPyFile),autoOutFile
545 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
546 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
548 class GenericPythonMonitoringLauncherCtxMgr:
549 def __init__(self, monitoringParams):
553 monitoringParams (MonitoringInfo)
555 self._monitoring_params = monitoringParams
558 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
559 self._monitoring_params.pid = pid
560 return self._monitoring_params
562 def __exit__(self,exctype, exc, tb):
563 StopMonitoring( self._monitoring_params )
565 def StopMonitoring( monitoringInfo ):
567 Kill monitoring subprocess.
571 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
574 KernelBasis.StopMonitoring(monitoringInfo.pid)
577 def __init__(self, intervalInMs, cpu, mem_rss):
582 cpu (list<float>) CPU usage
583 mem_rss (list<int>) rss memory usage
585 self._interval_in_ms = intervalInMs
586 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
588 st = """Interval in ms : {self.intervalInMs}
590 """.format( **locals() )
593 def intervalInMs(self):
594 return self._interval_in_ms
598 list of triplets. First param of pair is cpu usage
599 Second param of pair is memory usage
603 def ReadCPUMemInfoInternal( fileName ):
605 cpu = [] ; mem_rss = []
606 if os.path.exists( fileName ):
608 with open(fileName, "r") as f:
609 coarseData = [ elt.strip() for elt in f.readlines() ]
610 intervalInMs = int( coarseData[0] )
611 coarseData = coarseData[1:]
612 cpu = [float(elt) for elt in coarseData[::2]]
613 mem_rss = [ int(elt) for elt in coarseData[1::2]]
616 return CPUMemInfo(intervalInMs,cpu,mem_rss)
618 def ReadCPUMemInfo( monitoringInfo ):
620 Retrieve CPU/Mem data of monitoring.
624 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
630 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
633 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
637 timeStamps (list<datetimestruct>)
639 volumeOfDir (list<str>)
641 self._dir_name_monitored = dirNameMonitored
642 self._interval_in_ms = intervalInMs
643 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
645 st = """Filename monitored : {self.dirNameMonitored}
646 Interval in ms : ${self.intervalInMs}
648 """.format( **locals() )
651 def dirNameMonitored(self):
652 return self._dir_name_monitored
654 def intervalInMs(self):
655 return self._interval_in_ms
659 list of triplets. First param of triplet is datetimestruct
660 Second param of triplet is #inodes.
661 Thirst param of triplet is size.
665 def ReadInodeSizeInfoInternal( fileName ):
668 with open(fileName, "r") as f:
669 coarseData = [ elt.strip() for elt in f.readlines() ]
670 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
671 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
672 nbInodes = [int(elt) for elt in coarseData[1::3]]
673 volumeOfDir = coarseData[2::3]
674 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
676 def ReadInodeSizeInfo( monitoringInfo ):
678 Retrieve nb of inodes and size of monitoring
682 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
688 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
690 class SeqByteReceiver:
691 # 2GB limit to trigger split into chunks
692 CHUNK_SIZE = 2000000000
693 def __init__(self,sender):
696 self._obj.UnRegister()
699 size = self._obj.getSize()
700 if size <= SeqByteReceiver.CHUNK_SIZE:
701 return self.fetchOneShot( size )
703 return self.fetchByChunks( size )
704 def fetchOneShot(self,size):
705 return self._obj.sendPart(0,size)
706 def fetchByChunks(self,size):
708 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
710 data_for_split_case = bytes(0)
711 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
712 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
713 while iStart!=iEnd and iEnd <= size:
714 part = self._obj.sendPart(iStart,iEnd)
715 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
716 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
717 return data_for_split_case
719 FinalCode = """import pickle
720 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
723 orb = CORBA.ORB_init([''])
726 outputFileName = "{}"
728 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
729 with open(inputFileName,"rb") as f:
730 context = pickle.load( f )
731 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
732 with open(codeFileName,"r") as f:
735 exec( code , context )
736 # filter part of context to be exported to father process
737 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
739 with open(outputFileName,"wb") as f:
740 pickle.dump( context, f )
743 class PythonFunctionEvaluatorParams:
744 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
745 self._main_filename = mainFileName
746 self._code_filename = codeFileName
747 self._in_context_filename = inContextFileName
748 self._out_context_filename = outContextFileName
752 with open(self._out_context_filename,"rb") as f:
753 return pickle.load( f )
754 def destroyOnOK(self):
755 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
756 if os.path.exists( fileToDestroy ):
757 os.unlink( fileToDestroy )
758 def destroyOnKO(self, containerRef):
760 Called in the context of failure with replay mode activated
762 for fileToDestroy in [self._out_context_filename]:
763 if os.path.exists( fileToDestroy ):
764 os.unlink( fileToDestroy )
765 # register to container files group associated to the
766 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
769 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
772 def cleanOperations(self):
774 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
776 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
778 return f"return with non zero code ({returnCode})"
781 if keepFilesToReplay:
782 return f"""return with non zero code ({returnCode})
784 Looks like a hard crash as returnCode {returnCode} != 0
786 {self.cleanOperations}
790 return f"""return with non zero code ({returnCode})
792 Looks like a hard crash as returnCode {returnCode} != 0
796 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
798 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
803 code (str) : python code to be executed using context
804 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
805 outargsname (list<str>) : list of arguments to be exported
806 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
807 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
808 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
813 ScriptExecInfo : instance serverside
818 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
822 import subprocess as sp
825 def InternalExecResistant( code, context, outargsname):
826 orb = CORBA.ORB_init([''])
827 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
829 EXEC_CODE_FNAME_PXF = "execsafe_"
830 def RetrieveUniquePartFromPfx( fname ):
831 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
832 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
835 codeFileName = os.path.basename( codeFd.name )
836 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
837 with open(contextFileName,"wb") as contextFd:
838 pickle.dump( context, contextFd)
839 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
840 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
841 with open(mainExecFileName,"w") as f:
842 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
843 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
844 stdout, stderr = p.communicate()
845 returnCode = p.returncode
846 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
847 ret = instanceOfLogOfCurrentSession._current_instance
848 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
849 stdout = stdout.decode()
850 stderr = stderr.decode()
851 sys.stdout.write( stdout ) ; sys.stdout.flush()
852 sys.stderr.write( stderr ) ; sys.stderr.flush()
854 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
855 if len(pcklData) > 0:
856 ret = pickle.loads( pcklData )
857 context.update( evParams.result )
858 evParams.destroyOnOK()
861 if keepFilesToReplay:
862 evParams.destroyOnKO( containerRef )
864 evParams.destroyOnOK()
865 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
867 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
868 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
870 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
871 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
873 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
874 exec( code, context )
875 return instanceOfLogOfCurrentSession._current_instance
877 class LogOfCurrentExecutionSessionAbs(abc.ABC):
879 self._current_instance = ScriptExecInfo()
881 def addInfoOnLevel2(self, key, value):
882 setattr(self._current_instance,key,value)
885 def addFreestyleAndFlush(self, value):
886 raise RuntimeError("Must be overloaded")
888 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
889 def __init__(self, handleToCentralizedInst):
891 self._remote_handle = handleToCentralizedInst
893 def addFreestyleAndFlush(self, value):
894 self._current_instance.freestyle = value
895 self.finalizeAndPushToMaster()
897 def finalizeAndPushToMaster(self):
898 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
900 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
902 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
904 def __init__(self, handleToCentralizedInst = None):
906 def addFreestyleAndFlush(self, value):
909 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
910 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
911 def __init__(self, nodeName, code, poa, my_container, logscript):
912 """Initialize the node : compilation in the local context"""
913 Generic.__init__(self,poa)
914 self.nodeName=nodeName
916 self.my_container_py = my_container
917 self.my_container=my_container._container
918 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
919 self.ccode=compile(code,nodeName,'exec')
921 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
922 self._log_script = logscript
923 self._current_execution_session = None
924 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
927 def executeNow(self, outargsname):
928 raise RuntimeError("Must be overloaded")
931 # force removal of self.context. Don t know why it s not done by default
932 self.removeAllVarsInContext()
935 def getContainer(self):
936 return self.my_container
944 def defineNewCustomVar(self,varName,valueOfVar):
945 self.context[varName] = pickle.loads(valueOfVar)
948 def executeAnotherPieceOfCode(self,code):
949 """Called for initialization of container lodging self."""
951 ccode=compile(code,self.nodeName,'exec')
952 exec(ccode, self.context)
954 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
956 def assignNewCompiledCode(self,codeStr):
959 self.ccode=compile(codeStr,self.nodeName,'exec')
961 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
963 def executeSimple(self, key, val):
965 Same as execute method except that no pickelization mecanism is implied here. No output is expected
968 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
969 exec(self.ccode,self.context)
971 exc_typ,exc_val,exc_fr=sys.exc_info()
972 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
973 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
974 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
976 def execute(self,outargsname,argsin):
977 """Execute the script stored in attribute ccode with pickled args (argsin)"""
979 argsname,kws=pickle.loads(argsin)
980 self.context.update(kws)
981 exec(self.ccode, self.context)
983 for arg in outargsname:
984 if arg not in self.context:
985 raise KeyError("There is no variable %s in context" % arg)
986 argsout.append(self.context[arg])
987 argsout=pickle.dumps(tuple(argsout),-1)
990 exc_typ,exc_val,exc_fr=sys.exc_info()
991 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
992 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
993 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
995 def executeFirst(self,argsin):
996 """ Same than first part of self.execute to reduce memory peak."""
997 def ArgInMananger(self,argsin):
998 argsInPy = SeqByteReceiver( argsin )
999 data = argsInPy.data()
1000 self.addInfoOnLevel2("inputMem",len(data))
1001 _,kws=pickle.loads(data)
1004 self.beginOfCurrentExecutionSession()
1005 self.addTimeInfoOnLevel2("startInputTime")
1006 # to force call of SeqByteReceiver's destructor
1007 kws = ArgInMananger(self,argsin)
1008 vis = InOutputObjVisitor()
1010 # fetch real data if necessary
1011 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1012 self.addInfoOnLevel2("inputHDDMem",vis)
1013 self.context.update(kws)
1014 self.addTimeInfoOnLevel2("endInputTime")
1016 exc_typ,exc_val,exc_fr=sys.exc_info()
1017 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1018 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1019 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1021 def executeSecond(self,outargsname):
1022 """ Same than second part of self.execute to reduce memory peak."""
1025 self.addTimeInfoOnLevel2("startExecTime")
1027 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1028 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
1029 self._current_execution_session._current_instance = self.executeNow( outargsname )
1030 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1032 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1033 del monitoringParams
1034 self.addTimeInfoOnLevel2("endExecTime")
1035 self.addTimeInfoOnLevel2("startOutputTime")
1037 for arg in outargsname:
1038 if arg not in self.context:
1039 raise KeyError("There is no variable %s in context" % arg)
1040 argsout.append(self.context[arg])
1043 vis = InOutputObjVisitor()
1045 # the proxy mecanism is catched here
1046 argPickle = SpoolPickleObject( arg, vis )
1047 retArg = SenderByte_i( self.poa,argPickle )
1048 id_o = self.poa.activate_object(retArg)
1049 retObj = self.poa.id_to_reference(id_o)
1050 ret.append( retObj._narrow( SALOME.SenderByte ) )
1051 outputMem += len(argPickle)
1052 self.addInfoOnLevel2("outputMem",outputMem)
1053 self.addInfoOnLevel2("outputHDDMem",vis)
1054 self.addTimeInfoOnLevel2("endOutputTime")
1055 self.endOfCurrentExecutionSession()
1058 exc_typ,exc_val,exc_fr=sys.exc_info()
1059 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1060 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1061 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1063 def listAllVarsInContext(self):
1065 pat = re.compile("^__([a-z]+)__$")
1066 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1068 def removeAllVarsInContext(self):
1069 for elt in self.listAllVarsInContext():
1070 del self.context[elt]
1072 def getValueOfVarInContext(self,varName):
1074 return pickle.dumps(self.context[varName],-1)
1076 exc_typ,exc_val,exc_fr=sys.exc_info()
1077 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1078 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1081 def assignVarInContext(self, varName, value):
1083 self.context[varName][0] = pickle.loads(value)
1085 exc_typ,exc_val,exc_fr=sys.exc_info()
1086 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1087 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1090 def callMethodOnVarInContext(self, varName, methodName, args):
1092 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1094 exc_typ,exc_val,exc_fr=sys.exc_info()
1095 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1096 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1099 def beginOfCurrentExecutionSession(self):
1100 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1101 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1103 def endOfCurrentExecutionSession(self):
1104 self._current_execution_session.finalizeAndPushToMaster()
1105 self._current_execution_session = None
1107 def addInfoOnLevel2(self, key, value):
1108 self._current_execution_session.addInfoOnLevel2(key, value)
1110 def addTimeInfoOnLevel2(self, key):
1111 from datetime import datetime
1112 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1114 class PyScriptNode_i(PyScriptNode_Abstract_i):
1115 def __init__(self, nodeName, code, poa, my_container, logscript):
1116 super().__init__(nodeName, code, poa, my_container, logscript)
1118 def executeNow(self, outargsname):
1119 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1121 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1122 def __init__(self, nodeName, code, poa, my_container, logscript):
1123 super().__init__(nodeName, code, poa, my_container, logscript)
1125 def executeNow(self, outargsname):
1126 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1128 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1129 def __init__(self, nodeName, code, poa, my_container, logscript):
1130 super().__init__(nodeName, code, poa, my_container, logscript)
1132 def executeNow(self, outargsname):
1133 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)