1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
37 from SALOME_ContainerHelper import ScriptExecInfo
39 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
41 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
43 MY_KEY_TO_DETECT_FINISH = "neib av tuot"
45 class Generic(SALOME__POA.GenericObj):
46 """A Python implementation of the GenericObj CORBA IDL"""
47 def __init__(self,poa):
52 #print("Register called : %d"%self.cnt)
56 #print("UnRegister called : %d"%self.cnt)
59 oid=self.poa.servant_to_id(self)
60 self.poa.deactivate_object(oid)
63 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
67 #print("Destuctor called")
70 class PyNode_i (Engines__POA.PyNode,Generic):
71 """The implementation of the PyNode CORBA IDL"""
72 def __init__(self, nodeName,code,poa,my_container):
73 """Initialize the node : compilation in the local context"""
74 Generic.__init__(self,poa)
75 self.nodeName=nodeName
77 self.my_container=my_container._container
78 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
79 ccode=compile(code,nodeName,'exec')
81 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
82 exec(ccode, self.context)
84 def getContainer(self):
85 return self.my_container
93 def defineNewCustomVar(self,varName,valueOfVar):
94 self.context[varName] = pickle.loads(valueOfVar)
97 def executeAnotherPieceOfCode(self,code):
98 """Called for initialization of container lodging self."""
100 ccode=compile(code,self.nodeName,'exec')
101 exec(ccode, self.context)
103 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
105 def execute(self,funcName,argsin):
106 """Execute the function funcName found in local context with pickled args (argsin)"""
108 argsin,kws=pickle.loads(argsin)
109 func=self.context[funcName]
110 argsout=func(*argsin,**kws)
111 argsout=pickle.dumps(argsout,-1)
114 exc_typ,exc_val,exc_fr=sys.exc_info()
115 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
116 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
118 class SenderByte_i(SALOME__POA.SenderByte,Generic):
119 def __init__(self,poa,bytesToSend):
120 Generic.__init__(self,poa)
121 self.bytesToSend = bytesToSend
124 return len(self.bytesToSend)
126 def sendPart(self,n1,n2):
127 return self.bytesToSend[n1:n2]
129 DicoForProxyFile = { }
131 def GetSizeOfBufferedReader(f):
133 This method returns in bytes size of a file openned.
137 f (io.IOBase): buffered reader returned by open
145 f.seek(0,io.SEEK_END)
147 f.seek(pos,io.SEEK_SET)
150 def GetObjectFromFile(fname, visitor = None):
151 with open(fname,"rb") as f:
153 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
154 visitor.setFileName( fname )
158 def DumpInFile(obj,fname):
159 with open(fname,"wb") as f:
162 def IncrRefInFile(fname):
163 if fname in DicoForProxyFile:
164 DicoForProxyFile[fname] += 1
166 DicoForProxyFile[fname] = 2
169 def DecrRefInFile(fname):
170 if fname not in DicoForProxyFile:
173 cnt = DicoForProxyFile[fname]
174 DicoForProxyFile[fname] -= 1
176 del DicoForProxyFile[fname]
178 if os.path.exists(fname):
182 def GetBigObjectOnDiskThreshold():
183 return KernelBasis.GetBigObjOnDiskThreshold()
185 def ActivateProxyMecanismOrNot( sizeInByte ):
186 thres = GetBigObjectOnDiskThreshold()
190 return sizeInByte > thres
192 def GetBigObjectDirectory():
194 if not KernelBasis.BigObjOnDiskDirectoryDefined():
195 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
196 return os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
198 def GetBigObjectFileName():
200 Return a filename in the most secure manner (see tempfile documentation)
203 with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
207 class BigObjectOnDiskBase:
208 def __init__(self, fileName, objSerialized):
210 :param fileName: the file used to dump into.
211 :param objSerialized: the object in pickeled form
212 :type objSerialized: bytes
214 self._filename = fileName
215 # attribute _destroy is here to tell client side or server side
216 # only client side can be with _destroy set to True. server side due to risk of concurrency
217 # so pickled form of self must be done with this attribute set to False.
218 self._destroy = False
219 self.__dumpIntoFile(objSerialized)
221 def getDestroyStatus(self):
226 IncrRefInFile( self._filename )
228 # should never happen !
229 RuntimeError("Invalid call to incrRef !")
233 DecrRefInFile( self._filename )
235 # should never happen !
236 RuntimeError("Invalid call to decrRef !")
238 def unlinkOnDestructor(self):
241 def doNotTouchFile(self):
243 Method called slave side. The life cycle management of file is client side not slave side.
245 self._destroy = False
249 DecrRefInFile( self._filename )
251 def getFileName(self):
252 return self._filename
254 def __dumpIntoFile(self, objSerialized):
255 DumpInFile( objSerialized, self._filename )
257 def get(self, visitor = None):
258 obj = GetObjectFromFile( self._filename, visitor )
262 return float( self.get() )
265 return int( self.get() )
269 if isinstance(obj,str):
272 raise RuntimeError("Not a string")
274 class BigObjectOnDisk(BigObjectOnDiskBase):
275 def __init__(self, fileName, objSerialized):
276 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
278 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
279 def __init__(self, pos, length, fileName):
280 self._filename = fileName
281 self._destroy = False
283 self._length = length
285 def get(self, visitor = None):
286 fullObj = BigObjectOnDiskBase.get(self, visitor)
287 return fullObj[ self._pos ]
289 def __getitem__(self, i):
293 return len(self.get())
295 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
296 def __init__(self, length, fileName, objSerialized):
297 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
298 self._length = length
300 def __getitem__(self, i):
301 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
306 class BigObjectOnDiskList(BigObjectOnDiskSequence):
307 def __init__(self, length, fileName, objSerialized):
308 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
310 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
311 def __init__(self, length, fileName, objSerialized):
312 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
314 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
316 This method return a proxy instance of pickled form of object given in input.
320 obj (pickelable type) : object to be proxified
321 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
325 BigObjectOnDiskBase: proxy instance
327 pickleObj = pickleObjInit
328 if pickleObj is None:
329 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
330 fileName = GetBigObjectFileName()
332 visitor.setHDDMem( len(pickleObj) )
333 visitor.setFileName(fileName)
334 if isinstance( obj, list):
335 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
336 elif isinstance( obj, tuple):
337 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
339 proxyObj = BigObjectOnDisk( fileName , pickleObj )
342 def SpoolPickleObject( obj, visitor = None ):
344 with InOutputObjVisitorCM(visitor) as v:
345 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
346 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
349 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
350 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
353 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
355 def UnProxyObjectSimple( obj, visitor = None ):
357 Method to be called in Remote mode. Alterate the obj _status attribute.
358 Because the slave process does not participate in the reference counting
362 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
365 with InOutputObjVisitorCM(visitor) as v:
366 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
367 if isinstance(obj,BigObjectOnDiskBase):
370 elif isinstance( obj, list):
373 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
378 def UnProxyObjectSimpleLocal( obj ):
380 Method to be called in Local mode. Do not alterate the PyObj counter
382 if isinstance(obj,BigObjectOnDiskBase):
384 elif isinstance( obj, list):
387 retObj.append( UnProxyObjectSimpleLocal(elt) )
393 def __init__(self, fileName):
394 self._filename = fileName
397 return self._filename
399 class FileDeleter(FileHolder):
400 def __init__(self, fileName):
401 super().__init__( fileName )
404 if os.path.exists( self._filename ):
405 os.unlink( self._filename )
407 class MonitoringInfo:
408 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
409 self._py_file_name = pyFileName
410 self._interval_in_ms = intervalInMs
411 self._out_file_name = outFileName
415 def pyFileName(self):
416 return self._py_file_name
423 def pid(self, value):
427 def outFileName(self):
428 return self._out_file_name
431 def intervalInMs(self):
432 return self._interval_in_ms
434 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
436 This method loops indefinitely every intervalInMs milliseconds to scan
437 number of inodes and size of content recursively included into the in input directory.
442 outFileName (str) : name of file inside the results will be written. If None a new file is generated
444 See also CPUMemoryMonitoring
448 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
452 # outFileNameSave stores the content of outFileName during phase of dumping
453 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
454 outFileNameSave = f.name
455 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
457 tempOutFile = outFileName
458 if tempOutFile is None:
459 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
460 with open(tempPyFile,"w") as f:
462 import subprocess as sp
467 with open("{tempOutFile}","a") as f:
468 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
469 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
473 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
478 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
479 szOfDirStr = re.split("[\s]+",st)[0]
482 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
483 f.write( "{{}}\\n".format( str( nbinodes ) ) )
484 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
486 time.sleep( {intervalInMs} / 1000.0 )
487 """.format( **locals()))
488 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
489 pyFileName = FileDeleter( tempPyFile )
490 if outFileName is None:
491 outFileName = FileDeleter( tempOutFile )
493 outFileName = FileHolder(outFileName)
494 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
496 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
498 Launch a subprocess monitoring self process.
499 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
500 CPU usage and RSS memory of the calling process.
501 Communication between subprocess and self is done by file.
505 outFileName (str) : name of file inside the results will be written. If None a new file is generated
507 See also FileSystemMonitoring
510 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
513 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
515 tempOutFile = outFileName
516 if tempOutFile is None:
517 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
519 with open(tempPyFile,"w") as f:
520 f.write("""import psutil
522 process = psutil.Process( pid )
523 def getChargeOf( p ):
524 a,b = p.cpu_percent(), p.memory_info().rss
526 for c in p.children():
527 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
532 with open("{}","a") as f:
533 f.write( "{{}}\\n".format( "{}" ) )
535 cpu,mem_rss = getChargeOf( process )
536 f.write( "{{}}\\n".format( str( cpu ) ) )
537 f.write( "{{}}\\n".format( str( mem_rss ) ) )
539 time.sleep( {} / 1000.0 )
540 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
541 if outFileName is None:
542 autoOutFile = FileDeleter(tempOutFile)
544 autoOutFile = FileHolder(tempOutFile)
545 return FileDeleter(tempPyFile),autoOutFile
546 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
547 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
549 class GenericPythonMonitoringLauncherCtxMgr:
550 def __init__(self, monitoringParams):
554 monitoringParams (MonitoringInfo)
556 self._monitoring_params = monitoringParams
559 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
560 self._monitoring_params.pid = pid
561 return self._monitoring_params
563 def __exit__(self,exctype, exc, tb):
564 StopMonitoring( self._monitoring_params )
565 del self._monitoring_params
567 gc.collect() # force destruction of objects even in raise context
569 def StopMonitoring( monitoringInfo ):
571 Kill monitoring subprocess.
575 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
578 KernelBasis.StopMonitoring(monitoringInfo.pid)
581 def __init__(self, intervalInMs, cpu, mem_rss):
586 cpu (list<float>) CPU usage
587 mem_rss (list<int>) rss memory usage
589 self._interval_in_ms = intervalInMs
590 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
592 st = """Interval in ms : {self.intervalInMs}
594 """.format( **locals() )
597 def intervalInMs(self):
598 return self._interval_in_ms
602 list of triplets. First param of pair is cpu usage
603 Second param of pair is memory usage
607 def ReadCPUMemInfoInternal( fileName ):
609 cpu = [] ; mem_rss = []
610 if os.path.exists( fileName ):
612 with open(fileName, "r") as f:
613 coarseData = [ elt.strip() for elt in f.readlines() ]
614 intervalInMs = int( coarseData[0] )
615 coarseData = coarseData[1:]
616 cpu = [float(elt) for elt in coarseData[::2]]
617 mem_rss = [ int(elt) for elt in coarseData[1::2]]
620 return CPUMemInfo(intervalInMs,cpu,mem_rss)
622 def ReadCPUMemInfo( monitoringInfo ):
624 Retrieve CPU/Mem data of monitoring.
628 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
634 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
637 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
641 timeStamps (list<datetimestruct>)
643 volumeOfDir (list<str>)
645 self._dir_name_monitored = dirNameMonitored
646 self._interval_in_ms = intervalInMs
647 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
649 st = """Filename monitored : {self.dirNameMonitored}
650 Interval in ms : ${self.intervalInMs}
652 """.format( **locals() )
655 def dirNameMonitored(self):
656 return self._dir_name_monitored
658 def intervalInMs(self):
659 return self._interval_in_ms
663 list of triplets. First param of triplet is datetimestruct
664 Second param of triplet is #inodes.
665 Thirst param of triplet is size.
669 def ReadInodeSizeInfoInternal( fileName ):
672 with open(fileName, "r") as f:
673 coarseData = [ elt.strip() for elt in f.readlines() ]
674 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
675 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
676 nbInodes = [int(elt) for elt in coarseData[1::3]]
677 volumeOfDir = coarseData[2::3]
678 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
680 def ReadInodeSizeInfo( monitoringInfo ):
682 Retrieve nb of inodes and size of monitoring
686 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
692 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
694 class SeqByteReceiver:
695 # 2GB limit to trigger split into chunks
696 CHUNK_SIZE = 2000000000
697 def __init__(self,sender):
700 self._obj.UnRegister()
703 size = self._obj.getSize()
704 if size <= SeqByteReceiver.CHUNK_SIZE:
705 return self.fetchOneShot( size )
707 return self.fetchByChunks( size )
708 def fetchOneShot(self,size):
709 return self._obj.sendPart(0,size)
710 def fetchByChunks(self,size):
712 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
714 data_for_split_case = bytes(0)
715 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
716 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
717 while iStart!=iEnd and iEnd <= size:
718 part = self._obj.sendPart(iStart,iEnd)
719 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
720 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
721 return data_for_split_case
723 FinalCode = """import pickle
724 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
727 orb = CORBA.ORB_init([''])
730 outputFileName = "{}"
732 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
733 with open(inputFileName,"rb") as f:
734 context = pickle.load( f )
735 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
736 with open(codeFileName,"r") as f:
742 exec( code , context )
743 # filter part of context to be exported to father process
744 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
746 with open(outputFileName,"wb") as f:
747 pickle.dump( context, f )
750 class PythonFunctionEvaluatorParams:
751 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
752 self._main_filename = mainFileName
753 self._code_filename = codeFileName
754 self._in_context_filename = inContextFileName
755 self._out_context_filename = outContextFileName
759 with open(self._out_context_filename,"rb") as f:
760 return pickle.load( f )
761 def destroyOnOK(self):
762 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
763 if os.path.exists( fileToDestroy ):
764 os.unlink( fileToDestroy )
765 def destroyOnKO(self, containerRef):
767 Called in the context of failure with replay mode activated
769 for fileToDestroy in [self._out_context_filename]:
770 if os.path.exists( fileToDestroy ):
771 os.unlink( fileToDestroy )
772 # register to container files group associated to the
773 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
776 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
779 def cleanOperations(self):
781 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
783 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
785 return f"return with non zero code ({returnCode})"
788 if keepFilesToReplay:
789 return f"""return with non zero code ({returnCode})
791 Looks like a hard crash as returnCode {returnCode} != 0
793 {self.cleanOperations}
797 return f"""return with non zero code ({returnCode})
799 Looks like a hard crash as returnCode {returnCode} != 0
803 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
805 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
810 code (str) : python code to be executed using context
811 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
812 outargsname (list<str>) : list of arguments to be exported
813 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
814 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
815 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
816 closeEyesOnErrorAtExit (bool) : if True in case of crash of subprocess, if MY_KEY_TO_DETECT_FINISH is displayed at the end of stdout
821 ScriptExecInfo : instance serverside
826 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
830 import subprocess as sp
833 def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
834 def StdErrTreatment(closeEyesOnErrorAtExit , stderr):
835 if not closeEyesOnErrorAtExit:
838 return stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
840 return True,StdErrTreatment(closeEyesOnErrorAtExit , stderr)
841 if not closeEyesOnErrorAtExit:
843 return stderr[-len(MY_KEY_TO_DETECT_FINISH):] == MY_KEY_TO_DETECT_FINISH,stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
846 def InternalExecResistant( code, context, outargsname):
848 orb = CORBA.ORB_init([''])
849 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
851 EXEC_CODE_FNAME_PXF = "execsafe_"
852 def RetrieveUniquePartFromPfx( fname ):
853 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
854 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
856 if closeEyesOnErrorAtExit:
859 sys.stderr.write({!r})
860 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
862 codeFileName = os.path.basename( codeFd.name )
863 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
864 with open(contextFileName,"wb") as contextFd:
865 pickle.dump( context, contextFd)
866 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
867 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
868 with open(mainExecFileName,"w") as f:
869 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
870 for iTry in range( KernelBasis.GetNumberOfRetry() ):
872 print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
873 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
874 stdout, stderr = p.communicate()
875 returnCode = p.returncode
878 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
879 ret = instanceOfLogOfCurrentSession._current_instance
880 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
881 stdout = stdout.decode()
882 stderr = stderr.decode()
883 sys.stdout.write( stdout ) ; sys.stdout.flush()
884 isOK, stderr = IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr )
885 sys.stderr.write( stderr ) ; sys.stderr.flush()
887 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
888 if len(pcklData) > 0:
889 ret = pickle.loads( pcklData )
890 context.update( evParams.result )
891 evParams.destroyOnOK()
893 print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
896 if keepFilesToReplay:
897 evParams.destroyOnKO( containerRef )
899 evParams.destroyOnOK()
900 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
902 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
903 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
905 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
906 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
908 def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
909 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
911 def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
912 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
914 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
915 exec( code, context )
916 return instanceOfLogOfCurrentSession._current_instance
918 class LogOfCurrentExecutionSessionAbs(abc.ABC):
920 self._current_instance = ScriptExecInfo()
922 def addInfoOnLevel2(self, key, value):
923 setattr(self._current_instance,key,value)
926 def addFreestyleAndFlush(self, value):
927 raise RuntimeError("Must be overloaded")
929 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
930 def __init__(self, handleToCentralizedInst):
932 self._remote_handle = handleToCentralizedInst
934 def addFreestyleAndFlush(self, value):
935 self._current_instance.freestyle = value
936 self.finalizeAndPushToMaster()
938 def finalizeAndPushToMaster(self):
940 Voluntary do nothing in case of problem to avoid to trouble execution
943 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
947 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
949 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
951 def __init__(self, handleToCentralizedInst = None):
953 def addFreestyleAndFlush(self, value):
956 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
957 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
958 def __init__(self, nodeName, code, poa, my_container, logscript):
959 """Initialize the node : compilation in the local context"""
960 Generic.__init__(self,poa)
961 self.nodeName=nodeName
963 self.my_container_py = my_container
964 self.my_container=my_container._container
965 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
966 self.ccode=compile(code,nodeName,'exec')
968 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
969 self._log_script = logscript
970 self._current_execution_session = None
971 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
974 def executeNow(self, outargsname):
975 raise RuntimeError("Must be overloaded")
978 # force removal of self.context. Don t know why it s not done by default
979 self.removeAllVarsInContext()
982 def getContainer(self):
983 return self.my_container
991 def defineNewCustomVar(self,varName,valueOfVar):
992 self.context[varName] = pickle.loads(valueOfVar)
995 def executeAnotherPieceOfCode(self,code):
996 """Called for initialization of container lodging self."""
998 ccode=compile(code,self.nodeName,'exec')
999 exec(ccode, self.context)
1001 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
1003 def assignNewCompiledCode(self,codeStr):
1006 self.ccode=compile(codeStr,self.nodeName,'exec')
1008 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
1010 def executeSimple(self, key, val):
1012 Same as execute method except that no pickelization mecanism is implied here. No output is expected
1015 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
1016 exec(self.ccode,self.context)
1018 exc_typ,exc_val,exc_fr=sys.exc_info()
1019 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1020 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1021 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
1023 def execute(self,outargsname,argsin):
1024 """Execute the script stored in attribute ccode with pickled args (argsin)"""
1026 argsname,kws=pickle.loads(argsin)
1027 self.context.update(kws)
1028 exec(self.ccode, self.context)
1030 for arg in outargsname:
1031 if arg not in self.context:
1032 raise KeyError("There is no variable %s in context" % arg)
1033 argsout.append(self.context[arg])
1034 argsout=pickle.dumps(tuple(argsout),-1)
1037 exc_typ,exc_val,exc_fr=sys.exc_info()
1038 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1039 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1040 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1042 def executeFirst(self,argsin):
1043 """ Same than first part of self.execute to reduce memory peak."""
1044 def ArgInMananger(self,argsin):
1045 argsInPy = SeqByteReceiver( argsin )
1046 data = argsInPy.data()
1047 self.addInfoOnLevel2("inputMem",len(data))
1048 _,kws=pickle.loads(data)
1051 self.beginOfCurrentExecutionSession()
1052 self.addTimeInfoOnLevel2("startInputTime")
1053 # to force call of SeqByteReceiver's destructor
1054 kws = ArgInMananger(self,argsin)
1055 vis = InOutputObjVisitor()
1057 # fetch real data if necessary
1058 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1059 self.addInfoOnLevel2("inputHDDMem",vis)
1060 self.context.update(kws)
1061 self.addTimeInfoOnLevel2("endInputTime")
1063 exc_typ,exc_val,exc_fr=sys.exc_info()
1064 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1065 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1066 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1068 def executeSecond(self,outargsname):
1069 """ Same than second part of self.execute to reduce memory peak."""
1070 def executeSecondInternal(monitoringtimeresms):
1071 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( monitoringtimeresms ) ) as monitoringParams:
1072 currentInstance = self.executeNow( outargsname )
1073 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1074 return cpumeminfo, currentInstance
1078 self.addTimeInfoOnLevel2("startExecTime")
1080 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1081 cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
1083 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1084 self.addTimeInfoOnLevel2("endExecTime")
1085 self.addTimeInfoOnLevel2("startOutputTime")
1087 for arg in outargsname:
1088 if arg not in self.context:
1089 raise KeyError("There is no variable %s in context" % arg)
1090 argsout.append(self.context[arg])
1093 vis = InOutputObjVisitor()
1095 # the proxy mecanism is catched here
1096 argPickle = SpoolPickleObject( arg, vis )
1097 retArg = SenderByte_i( self.poa,argPickle )
1098 id_o = self.poa.activate_object(retArg)
1099 retObj = self.poa.id_to_reference(id_o)
1100 ret.append( retObj._narrow( SALOME.SenderByte ) )
1101 outputMem += len(argPickle)
1102 self.addInfoOnLevel2("outputMem",outputMem)
1103 self.addInfoOnLevel2("outputHDDMem",vis)
1104 self.addTimeInfoOnLevel2("endOutputTime")
1105 self.endOfCurrentExecutionSession()
1108 exc_typ,exc_val,exc_fr=sys.exc_info()
1109 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1110 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1111 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1113 def listAllVarsInContext(self):
1115 pat = re.compile("^__([a-z]+)__$")
1116 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1118 def removeAllVarsInContext(self):
1119 for elt in self.listAllVarsInContext():
1120 del self.context[elt]
1122 def getValueOfVarInContext(self,varName):
1124 return pickle.dumps(self.context[varName],-1)
1126 exc_typ,exc_val,exc_fr=sys.exc_info()
1127 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1128 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1131 def assignVarInContext(self, varName, value):
1133 self.context[varName][0] = pickle.loads(value)
1135 exc_typ,exc_val,exc_fr=sys.exc_info()
1136 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1137 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1140 def callMethodOnVarInContext(self, varName, methodName, args):
1142 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1144 exc_typ,exc_val,exc_fr=sys.exc_info()
1145 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1146 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1149 def beginOfCurrentExecutionSession(self):
1150 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1151 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1153 def endOfCurrentExecutionSession(self):
1154 self._current_execution_session.finalizeAndPushToMaster()
1155 self._current_execution_session = None
1157 def addInfoOnLevel2(self, key, value):
1158 self._current_execution_session.addInfoOnLevel2(key, value)
1160 def addTimeInfoOnLevel2(self, key):
1161 from datetime import datetime
1162 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1164 class PyScriptNode_i(PyScriptNode_Abstract_i):
1165 def __init__(self, nodeName, code, poa, my_container, logscript):
1166 super().__init__(nodeName, code, poa, my_container, logscript)
1168 def executeNow(self, outargsname):
1169 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1171 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1172 def __init__(self, nodeName, code, poa, my_container, logscript):
1173 super().__init__(nodeName, code, poa, my_container, logscript)
1175 def executeNow(self, outargsname):
1176 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1178 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1179 def __init__(self, nodeName, code, poa, my_container, logscript):
1180 super().__init__(nodeName, code, poa, my_container, logscript)
1182 def executeNow(self, outargsname):
1183 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1185 class PyScriptNode_OutOfProcess_FT_i(PyScriptNode_Abstract_i):
1186 def __init__(self, nodeName, code, poa, my_container, logscript):
1187 super().__init__(nodeName, code, poa, my_container, logscript)
1189 def executeNow(self, outargsname):
1190 return ExecCrashProofWithoutReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1192 class PyScriptNode_OutOfProcess_Replay_FT_i(PyScriptNode_Abstract_i):
1193 def __init__(self, nodeName, code, poa, my_container, logscript):
1194 super().__init__(nodeName, code, poa, my_container, logscript)
1196 def executeNow(self, outargsname):
1197 return ExecCrashProofWithReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)