1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
37 from SALOME_ContainerHelper import ScriptExecInfo
39 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
41 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
43 MY_KEY_TO_DETECT_FINISH = "neib av tuot"
45 class Generic(SALOME__POA.GenericObj):
46 """A Python implementation of the GenericObj CORBA IDL"""
47 def __init__(self,poa):
52 #print("Register called : %d"%self.cnt)
56 #print("UnRegister called : %d"%self.cnt)
59 oid=self.poa.servant_to_id(self)
60 self.poa.deactivate_object(oid)
63 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
67 #print("Destuctor called")
70 class PyNode_i (Engines__POA.PyNode,Generic):
71 """The implementation of the PyNode CORBA IDL"""
72 def __init__(self, nodeName,code,poa,my_container):
73 """Initialize the node : compilation in the local context"""
74 Generic.__init__(self,poa)
75 self.nodeName=nodeName
77 self.my_container=my_container._container
78 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
79 ccode=compile(code,nodeName,'exec')
81 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
82 exec(ccode, self.context)
84 def getContainer(self):
85 return self.my_container
93 def defineNewCustomVar(self,varName,valueOfVar):
94 self.context[varName] = pickle.loads(valueOfVar)
97 def executeAnotherPieceOfCode(self,code):
98 """Called for initialization of container lodging self."""
100 ccode=compile(code,self.nodeName,'exec')
101 exec(ccode, self.context)
103 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
105 def execute(self,funcName,argsin):
106 """Execute the function funcName found in local context with pickled args (argsin)"""
108 argsin,kws=pickle.loads(argsin)
109 func=self.context[funcName]
110 argsout=func(*argsin,**kws)
111 argsout=pickle.dumps(argsout,-1)
114 exc_typ,exc_val,exc_fr=sys.exc_info()
115 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
116 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
118 class SenderByte_i(SALOME__POA.SenderByte,Generic):
119 def __init__(self,poa,bytesToSend):
120 Generic.__init__(self,poa)
121 self.bytesToSend = bytesToSend
124 return len(self.bytesToSend)
126 def sendPart(self,n1,n2):
127 return self.bytesToSend[n1:n2]
129 DicoForProxyFile = { }
131 def GetSizeOfBufferedReader(f):
133 This method returns in bytes size of a file openned.
137 f (io.IOBase): buffered reader returned by open
145 f.seek(0,io.SEEK_END)
147 f.seek(pos,io.SEEK_SET)
150 def GetObjectFromFile(fname, visitor = None):
151 with open(fname,"rb") as f:
153 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
154 visitor.setFileName( fname )
158 def DumpInFile(obj,fname):
159 with open(fname,"wb") as f:
162 def IncrRefInFile(fname):
163 if fname in DicoForProxyFile:
164 DicoForProxyFile[fname] += 1
166 DicoForProxyFile[fname] = 2
169 def DecrRefInFile(fname):
170 if fname not in DicoForProxyFile:
173 cnt = DicoForProxyFile[fname]
174 DicoForProxyFile[fname] -= 1
176 del DicoForProxyFile[fname]
178 if os.path.exists(fname):
182 def GetBigObjectOnDiskThreshold():
183 return KernelBasis.GetBigObjOnDiskThreshold()
185 def ActivateProxyMecanismOrNot( sizeInByte ):
186 thres = GetBigObjectOnDiskThreshold()
190 return sizeInByte > thres
192 def GetBigObjectDirectory():
194 if not KernelBasis.BigObjOnDiskDirectoryDefined():
195 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
196 return os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
198 def GetBigObjectFileName():
200 Return a filename in the most secure manner (see tempfile documentation)
203 with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
207 class BigObjectOnDiskBase:
208 def __init__(self, fileName, objSerialized):
210 :param fileName: the file used to dump into.
211 :param objSerialized: the object in pickeled form
212 :type objSerialized: bytes
214 self._filename = fileName
215 # attribute _destroy is here to tell client side or server side
216 # only client side can be with _destroy set to True. server side due to risk of concurrency
217 # so pickled form of self must be done with this attribute set to False.
218 self._destroy = False
219 self.__dumpIntoFile(objSerialized)
221 def getDestroyStatus(self):
226 IncrRefInFile( self._filename )
228 # should never happen !
229 RuntimeError("Invalid call to incrRef !")
233 DecrRefInFile( self._filename )
235 # should never happen !
236 RuntimeError("Invalid call to decrRef !")
238 def unlinkOnDestructor(self):
241 def doNotTouchFile(self):
243 Method called slave side. The life cycle management of file is client side not slave side.
245 self._destroy = False
249 DecrRefInFile( self._filename )
251 def getFileName(self):
252 return self._filename
254 def __dumpIntoFile(self, objSerialized):
255 DumpInFile( objSerialized, self._filename )
257 def get(self, visitor = None):
258 obj = GetObjectFromFile( self._filename, visitor )
262 return float( self.get() )
265 return int( self.get() )
269 if isinstance(obj,str):
272 raise RuntimeError("Not a string")
274 class BigObjectOnDisk(BigObjectOnDiskBase):
275 def __init__(self, fileName, objSerialized):
276 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
278 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
279 def __init__(self, pos, length, fileName):
280 self._filename = fileName
281 self._destroy = False
283 self._length = length
285 def get(self, visitor = None):
286 fullObj = BigObjectOnDiskBase.get(self, visitor)
287 return fullObj[ self._pos ]
289 def __getitem__(self, i):
293 return len(self.get())
295 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
296 def __init__(self, length, fileName, objSerialized):
297 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
298 self._length = length
300 def __getitem__(self, i):
301 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
306 class BigObjectOnDiskList(BigObjectOnDiskSequence):
307 def __init__(self, length, fileName, objSerialized):
308 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
310 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
311 def __init__(self, length, fileName, objSerialized):
312 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
314 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
316 This method return a proxy instance of pickled form of object given in input.
320 obj (pickelable type) : object to be proxified
321 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
325 BigObjectOnDiskBase: proxy instance
327 pickleObj = pickleObjInit
328 if pickleObj is None:
329 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
330 fileName = GetBigObjectFileName()
332 visitor.setHDDMem( len(pickleObj) )
333 visitor.setFileName(fileName)
334 if isinstance( obj, list):
335 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
336 elif isinstance( obj, tuple):
337 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
339 proxyObj = BigObjectOnDisk( fileName , pickleObj )
342 def SpoolPickleObject( obj, visitor = None ):
344 with InOutputObjVisitorCM(visitor) as v:
345 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
346 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
349 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
350 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
353 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
355 def UnProxyObjectSimple( obj, visitor = None ):
357 Method to be called in Remote mode. Alterate the obj _status attribute.
358 Because the slave process does not participate in the reference counting
362 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
365 with InOutputObjVisitorCM(visitor) as v:
366 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
367 if isinstance(obj,BigObjectOnDiskBase):
370 elif isinstance( obj, list):
373 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
378 def UnProxyObjectSimpleLocal( obj ):
380 Method to be called in Local mode. Do not alterate the PyObj counter
382 if isinstance(obj,BigObjectOnDiskBase):
384 elif isinstance( obj, list):
387 retObj.append( UnProxyObjectSimpleLocal(elt) )
393 def __init__(self, fileName):
394 self._filename = fileName
397 return self._filename
399 class FileDeleter(FileHolder):
400 def __init__(self, fileName):
401 super().__init__( fileName )
404 if os.path.exists( self._filename ):
405 os.unlink( self._filename )
407 class MonitoringInfo:
408 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
409 self._py_file_name = pyFileName
410 self._interval_in_ms = intervalInMs
411 self._out_file_name = outFileName
415 def pyFileName(self):
416 return self._py_file_name
423 def pid(self, value):
427 def outFileName(self):
428 return self._out_file_name
431 def intervalInMs(self):
432 return self._interval_in_ms
434 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
436 This method loops indefinitely every intervalInMs milliseconds to scan
437 number of inodes and size of content recursively included into the in input directory.
442 outFileName (str) : name of file inside the results will be written. If None a new file is generated
444 See also CPUMemoryMonitoring
448 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
452 # outFileNameSave stores the content of outFileName during phase of dumping
453 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
454 outFileNameSave = f.name
455 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
457 tempOutFile = outFileName
458 if tempOutFile is None:
459 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
460 with open(tempPyFile,"w") as f:
462 import subprocess as sp
467 with open("{tempOutFile}","a") as f:
468 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
469 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
473 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
478 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
479 szOfDirStr = re.split("[\s]+",st)[0]
482 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
483 f.write( "{{}}\\n".format( str( nbinodes ) ) )
484 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
486 time.sleep( {intervalInMs} / 1000.0 )
487 """.format( **locals()))
488 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
489 pyFileName = FileDeleter( tempPyFile )
490 if outFileName is None:
491 outFileName = FileDeleter( tempOutFile )
493 outFileName = FileHolder(outFileName)
494 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
496 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
498 Launch a subprocess monitoring self process.
499 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
500 CPU usage and RSS memory of the calling process.
501 Communication between subprocess and self is done by file.
505 outFileName (str) : name of file inside the results will be written. If None a new file is generated
507 See also FileSystemMonitoring
510 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
513 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
515 tempOutFile = outFileName
516 if tempOutFile is None:
517 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
519 with open(tempPyFile,"w") as f:
520 f.write("""import psutil
522 process = psutil.Process( pid )
523 def getChargeOf( p ):
524 a,b = p.cpu_percent(), p.memory_info().rss
526 for c in p.children():
527 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
532 with open("{}","a") as f:
533 f.write( "{{}}\\n".format( "{}" ) )
535 cpu,mem_rss = getChargeOf( process )
536 f.write( "{{}}\\n".format( str( cpu ) ) )
537 f.write( "{{}}\\n".format( str( mem_rss ) ) )
539 time.sleep( {} / 1000.0 )
540 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
541 if outFileName is None:
542 autoOutFile = FileDeleter(tempOutFile)
544 autoOutFile = FileHolder(tempOutFile)
545 return FileDeleter(tempPyFile),autoOutFile
546 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
547 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
549 class GenericPythonMonitoringLauncherCtxMgr:
550 def __init__(self, monitoringParams):
554 monitoringParams (MonitoringInfo)
556 self._monitoring_params = monitoringParams
559 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
560 self._monitoring_params.pid = pid
561 return self._monitoring_params
563 def __exit__(self,exctype, exc, tb):
564 StopMonitoring( self._monitoring_params )
565 del self._monitoring_params
567 gc.collect() # force destruction of objects even in raise context
569 def StopMonitoring( monitoringInfo ):
571 Kill monitoring subprocess.
575 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
578 KernelBasis.StopMonitoring(monitoringInfo.pid)
581 def __init__(self, intervalInMs, cpu, mem_rss):
586 cpu (list<float>) CPU usage
587 mem_rss (list<int>) rss memory usage
589 self._interval_in_ms = intervalInMs
590 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
592 st = """Interval in ms : {self.intervalInMs}
594 """.format( **locals() )
597 def intervalInMs(self):
598 return self._interval_in_ms
602 list of triplets. First param of pair is cpu usage
603 Second param of pair is memory usage
607 def ReadCPUMemInfoInternal( fileName ):
609 cpu = [] ; mem_rss = []
610 if os.path.exists( fileName ):
612 with open(fileName, "r") as f:
613 coarseData = [ elt.strip() for elt in f.readlines() ]
614 intervalInMs = int( coarseData[0] )
615 coarseData = coarseData[1:]
616 cpu = [float(elt) for elt in coarseData[::2]]
617 mem_rss = [ int(elt) for elt in coarseData[1::2]]
620 return CPUMemInfo(intervalInMs,cpu,mem_rss)
622 def ReadCPUMemInfo( monitoringInfo ):
624 Retrieve CPU/Mem data of monitoring.
628 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
634 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
637 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
641 timeStamps (list<datetimestruct>)
643 volumeOfDir (list<str>)
645 self._dir_name_monitored = dirNameMonitored
646 self._interval_in_ms = intervalInMs
647 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
649 st = """Filename monitored : {self.dirNameMonitored}
650 Interval in ms : ${self.intervalInMs}
652 """.format( **locals() )
655 def dirNameMonitored(self):
656 return self._dir_name_monitored
658 def intervalInMs(self):
659 return self._interval_in_ms
663 list of triplets. First param of triplet is datetimestruct
664 Second param of triplet is #inodes.
665 Thirst param of triplet is size.
669 def ReadInodeSizeInfoInternal( fileName ):
672 with open(fileName, "r") as f:
673 coarseData = [ elt.strip() for elt in f.readlines() ]
674 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
675 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
676 nbInodes = [int(elt) for elt in coarseData[1::3]]
677 volumeOfDir = coarseData[2::3]
678 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
680 def ReadInodeSizeInfo( monitoringInfo ):
682 Retrieve nb of inodes and size of monitoring
686 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
692 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
694 class SeqByteReceiver:
695 # 2GB limit to trigger split into chunks
696 CHUNK_SIZE = 2000000000
697 def __init__(self,sender):
700 self._obj.UnRegister()
703 size = self._obj.getSize()
704 if size <= SeqByteReceiver.CHUNK_SIZE:
705 return self.fetchOneShot( size )
707 return self.fetchByChunks( size )
708 def fetchOneShot(self,size):
709 return self._obj.sendPart(0,size)
710 def fetchByChunks(self,size):
712 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
714 data_for_split_case = bytes(0)
715 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
716 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
717 while iStart!=iEnd and iEnd <= size:
718 part = self._obj.sendPart(iStart,iEnd)
719 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
720 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
721 return data_for_split_case
723 FinalCode = """import pickle
724 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
727 orb = CORBA.ORB_init([''])
730 outputFileName = "{}"
732 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
733 with open(inputFileName,"rb") as f:
734 context = pickle.load( f )
735 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
736 with open(codeFileName,"r") as f:
739 exec( code , context )
740 # filter part of context to be exported to father process
741 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
743 with open(outputFileName,"wb") as f:
744 pickle.dump( context, f )
747 class PythonFunctionEvaluatorParams:
748 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
749 self._main_filename = mainFileName
750 self._code_filename = codeFileName
751 self._in_context_filename = inContextFileName
752 self._out_context_filename = outContextFileName
756 with open(self._out_context_filename,"rb") as f:
757 return pickle.load( f )
758 def destroyOnOK(self):
759 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
760 if os.path.exists( fileToDestroy ):
761 os.unlink( fileToDestroy )
762 def destroyOnKO(self, containerRef):
764 Called in the context of failure with replay mode activated
766 for fileToDestroy in [self._out_context_filename]:
767 if os.path.exists( fileToDestroy ):
768 os.unlink( fileToDestroy )
769 # register to container files group associated to the
770 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
773 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
776 def cleanOperations(self):
778 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
780 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
782 return f"return with non zero code ({returnCode})"
785 if keepFilesToReplay:
786 return f"""return with non zero code ({returnCode})
788 Looks like a hard crash as returnCode {returnCode} != 0
790 {self.cleanOperations}
794 return f"""return with non zero code ({returnCode})
796 Looks like a hard crash as returnCode {returnCode} != 0
800 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
802 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
807 code (str) : python code to be executed using context
808 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
809 outargsname (list<str>) : list of arguments to be exported
810 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
811 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
812 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
813 closeEyesOnErrorAtExit (bool) : if True in case of crash of subprocess, if MY_KEY_TO_DETECT_FINISH is displayed at the end of stdout
818 ScriptExecInfo : instance serverside
823 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
827 import subprocess as sp
830 def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
831 def StdErrTreatment(closeEyesOnErrorAtExit , stderr):
832 if not closeEyesOnErrorAtExit:
835 return stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
837 return True,StdErrTreatment(closeEyesOnErrorAtExit , stderr)
838 if not closeEyesOnErrorAtExit:
840 if stderr[-len(MY_KEY_TO_DETECT_FINISH):] == MY_KEY_TO_DETECT_FINISH:
841 return True,stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
846 def InternalExecResistant( code, context, outargsname):
848 orb = CORBA.ORB_init([''])
849 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
851 EXEC_CODE_FNAME_PXF = "execsafe_"
852 def RetrieveUniquePartFromPfx( fname ):
853 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
854 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
855 codeFd.write( "{}\n".format( containerRef.get_startup_code() ) )
857 if closeEyesOnErrorAtExit:
860 sys.stderr.write({!r})
861 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
863 codeFileName = os.path.basename( codeFd.name )
864 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
865 with open(contextFileName,"wb") as contextFd:
866 pickle.dump( context, contextFd)
867 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
868 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
869 with open(mainExecFileName,"w") as f:
870 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
871 for iTry in range( KernelBasis.GetNumberOfRetry() ):
873 print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
874 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
875 stdout, stderr = p.communicate()
876 returnCode = p.returncode
879 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
880 ret = instanceOfLogOfCurrentSession._current_instance
881 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
882 stdout = stdout.decode()
883 stderr = stderr.decode()
884 sys.stdout.write( stdout ) ; sys.stdout.flush()
885 isOK, stderr = IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr )
886 sys.stderr.write( stderr ) ; sys.stderr.flush()
888 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
889 if len(pcklData) > 0:
890 ret = pickle.loads( pcklData )
891 context.update( evParams.result )
892 evParams.destroyOnOK()
894 print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
897 if keepFilesToReplay:
898 evParams.destroyOnKO( containerRef )
900 evParams.destroyOnOK()
901 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
903 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
904 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
906 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
907 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
909 def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
910 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
912 def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
913 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
915 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
916 exec( code, context )
917 return instanceOfLogOfCurrentSession._current_instance
919 class LogOfCurrentExecutionSessionAbs(abc.ABC):
921 self._current_instance = ScriptExecInfo()
923 def addInfoOnLevel2(self, key, value):
924 setattr(self._current_instance,key,value)
927 def addFreestyleAndFlush(self, value):
928 raise RuntimeError("Must be overloaded")
930 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
931 def __init__(self, handleToCentralizedInst):
933 self._remote_handle = handleToCentralizedInst
935 def addFreestyleAndFlush(self, value):
936 self._current_instance.freestyle = value
937 self.finalizeAndPushToMaster()
939 def finalizeAndPushToMaster(self):
941 Voluntary do nothing in case of problem to avoid to trouble execution
944 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
948 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
950 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
952 def __init__(self, handleToCentralizedInst = None):
954 def addFreestyleAndFlush(self, value):
957 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
958 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
959 def __init__(self, nodeName, code, poa, my_container, logscript):
960 """Initialize the node : compilation in the local context"""
961 Generic.__init__(self,poa)
962 self.nodeName=nodeName
964 self.my_container_py = my_container
965 self.my_container=my_container._container
966 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
967 self.ccode=compile(code,nodeName,'exec')
969 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
970 self._log_script = logscript
971 self._current_execution_session = None
972 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
975 def executeNow(self, outargsname):
976 raise RuntimeError("Must be overloaded")
979 # force removal of self.context. Don t know why it s not done by default
980 self.removeAllVarsInContext()
983 def getContainer(self):
984 return self.my_container
992 def defineNewCustomVar(self,varName,valueOfVar):
993 self.context[varName] = pickle.loads(valueOfVar)
996 def executeAnotherPieceOfCode(self,code):
997 """Called for initialization of container lodging self."""
999 ccode=compile(code,self.nodeName,'exec')
1000 exec(ccode, self.context)
1002 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
1004 def assignNewCompiledCode(self,codeStr):
1007 self.ccode=compile(codeStr,self.nodeName,'exec')
1009 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
1011 def executeSimple(self, key, val):
1013 Same as execute method except that no pickelization mecanism is implied here. No output is expected
1016 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
1017 exec(self.ccode,self.context)
1019 exc_typ,exc_val,exc_fr=sys.exc_info()
1020 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1021 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1022 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
1024 def execute(self,outargsname,argsin):
1025 """Execute the script stored in attribute ccode with pickled args (argsin)"""
1027 argsname,kws=pickle.loads(argsin)
1028 self.context.update(kws)
1029 exec(self.ccode, self.context)
1031 for arg in outargsname:
1032 if arg not in self.context:
1033 raise KeyError("There is no variable %s in context" % arg)
1034 argsout.append(self.context[arg])
1035 argsout=pickle.dumps(tuple(argsout),-1)
1038 exc_typ,exc_val,exc_fr=sys.exc_info()
1039 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1040 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1041 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1043 def executeFirst(self,argsin):
1044 """ Same than first part of self.execute to reduce memory peak."""
1045 def ArgInMananger(self,argsin):
1046 argsInPy = SeqByteReceiver( argsin )
1047 data = argsInPy.data()
1048 self.addInfoOnLevel2("inputMem",len(data))
1049 _,kws=pickle.loads(data)
1052 self.beginOfCurrentExecutionSession()
1053 self.addTimeInfoOnLevel2("startInputTime")
1054 # to force call of SeqByteReceiver's destructor
1055 kws = ArgInMananger(self,argsin)
1056 vis = InOutputObjVisitor()
1058 # fetch real data if necessary
1059 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1060 self.addInfoOnLevel2("inputHDDMem",vis)
1061 self.context.update(kws)
1062 self.addTimeInfoOnLevel2("endInputTime")
1064 exc_typ,exc_val,exc_fr=sys.exc_info()
1065 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1066 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1067 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1069 def executeSecond(self,outargsname):
1070 """ Same than second part of self.execute to reduce memory peak."""
1071 def executeSecondInternal(monitoringtimeresms):
1072 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( monitoringtimeresms ) ) as monitoringParams:
1073 currentInstance = self.executeNow( outargsname )
1074 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1075 return cpumeminfo, currentInstance
1079 self.addTimeInfoOnLevel2("startExecTime")
1081 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1082 cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
1084 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1085 self.addTimeInfoOnLevel2("endExecTime")
1086 self.addTimeInfoOnLevel2("startOutputTime")
1088 for arg in outargsname:
1089 if arg not in self.context:
1090 raise KeyError("There is no variable %s in context" % arg)
1091 argsout.append(self.context[arg])
1094 vis = InOutputObjVisitor()
1096 # the proxy mecanism is catched here
1097 argPickle = SpoolPickleObject( arg, vis )
1098 retArg = SenderByte_i( self.poa,argPickle )
1099 id_o = self.poa.activate_object(retArg)
1100 retObj = self.poa.id_to_reference(id_o)
1101 ret.append( retObj._narrow( SALOME.SenderByte ) )
1102 outputMem += len(argPickle)
1103 self.addInfoOnLevel2("outputMem",outputMem)
1104 self.addInfoOnLevel2("outputHDDMem",vis)
1105 self.addTimeInfoOnLevel2("endOutputTime")
1106 self.endOfCurrentExecutionSession()
1109 exc_typ,exc_val,exc_fr=sys.exc_info()
1110 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1111 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1112 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1114 def listAllVarsInContext(self):
1116 pat = re.compile("^__([a-z]+)__$")
1117 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1119 def removeAllVarsInContext(self):
1120 for elt in self.listAllVarsInContext():
1121 del self.context[elt]
1123 def getValueOfVarInContext(self,varName):
1125 return pickle.dumps(self.context[varName],-1)
1127 exc_typ,exc_val,exc_fr=sys.exc_info()
1128 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1129 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1132 def assignVarInContext(self, varName, value):
1134 self.context[varName][0] = pickle.loads(value)
1136 exc_typ,exc_val,exc_fr=sys.exc_info()
1137 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1138 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1141 def callMethodOnVarInContext(self, varName, methodName, args):
1143 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1145 exc_typ,exc_val,exc_fr=sys.exc_info()
1146 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1147 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1150 def beginOfCurrentExecutionSession(self):
1151 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1152 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1154 def endOfCurrentExecutionSession(self):
1155 self._current_execution_session.finalizeAndPushToMaster()
1156 self._current_execution_session = None
1158 def addInfoOnLevel2(self, key, value):
1159 self._current_execution_session.addInfoOnLevel2(key, value)
1161 def addTimeInfoOnLevel2(self, key):
1162 from datetime import datetime
1163 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1165 class PyScriptNode_i(PyScriptNode_Abstract_i):
1166 def __init__(self, nodeName, code, poa, my_container, logscript):
1167 super().__init__(nodeName, code, poa, my_container, logscript)
1169 def executeNow(self, outargsname):
1170 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1172 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1173 def __init__(self, nodeName, code, poa, my_container, logscript):
1174 super().__init__(nodeName, code, poa, my_container, logscript)
1176 def executeNow(self, outargsname):
1177 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1179 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1180 def __init__(self, nodeName, code, poa, my_container, logscript):
1181 super().__init__(nodeName, code, poa, my_container, logscript)
1183 def executeNow(self, outargsname):
1184 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1186 class PyScriptNode_OutOfProcess_FT_i(PyScriptNode_Abstract_i):
1187 def __init__(self, nodeName, code, poa, my_container, logscript):
1188 super().__init__(nodeName, code, poa, my_container, logscript)
1190 def executeNow(self, outargsname):
1191 return ExecCrashProofWithoutReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1193 class PyScriptNode_OutOfProcess_Replay_FT_i(PyScriptNode_Abstract_i):
1194 def __init__(self, nodeName, code, poa, my_container, logscript):
1195 super().__init__(nodeName, code, poa, my_container, logscript)
1197 def executeNow(self, outargsname):
1198 return ExecCrashProofWithReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)