1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
37 from SALOME_ContainerHelper import ScriptExecInfo
39 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
41 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
43 MY_KEY_TO_DETECT_FINISH = "neib av tuot"
45 class Generic(SALOME__POA.GenericObj):
46 """A Python implementation of the GenericObj CORBA IDL"""
47 def __init__(self,poa):
52 #print("Register called : %d"%self.cnt)
56 #print("UnRegister called : %d"%self.cnt)
59 oid=self.poa.servant_to_id(self)
60 self.poa.deactivate_object(oid)
63 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
67 #print("Destuctor called")
70 class PyNode_i (Engines__POA.PyNode,Generic):
71 """The implementation of the PyNode CORBA IDL"""
72 def __init__(self, nodeName,code,poa,my_container):
73 """Initialize the node : compilation in the local context"""
74 Generic.__init__(self,poa)
75 self.nodeName=nodeName
77 self.my_container=my_container._container
78 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
79 ccode=compile(code,nodeName,'exec')
81 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
82 exec(ccode, self.context)
84 def getContainer(self):
85 return self.my_container
93 def defineNewCustomVar(self,varName,valueOfVar):
94 self.context[varName] = pickle.loads(valueOfVar)
97 def executeAnotherPieceOfCode(self,code):
98 """Called for initialization of container lodging self."""
100 ccode=compile(code,self.nodeName,'exec')
101 exec(ccode, self.context)
103 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
105 def execute(self,funcName,argsin):
106 """Execute the function funcName found in local context with pickled args (argsin)"""
108 argsin,kws=pickle.loads(argsin)
109 func=self.context[funcName]
110 argsout=func(*argsin,**kws)
111 argsout=pickle.dumps(argsout,-1)
114 exc_typ,exc_val,exc_fr=sys.exc_info()
115 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
116 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
118 class SenderByte_i(SALOME__POA.SenderByte,Generic):
119 def __init__(self,poa,bytesToSend):
120 Generic.__init__(self,poa)
121 self.bytesToSend = bytesToSend
124 return len(self.bytesToSend)
126 def sendPart(self,n1,n2):
127 return self.bytesToSend[n1:n2]
129 DicoForProxyFile = { }
131 def GetSizeOfBufferedReader(f):
133 This method returns in bytes size of a file openned.
137 f (io.IOBase): buffered reader returned by open
145 f.seek(0,io.SEEK_END)
147 f.seek(pos,io.SEEK_SET)
150 def GetObjectFromFile(fname, visitor = None):
151 with open(fname,"rb") as f:
153 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
154 visitor.setFileName( fname )
158 def DumpInFile(obj,fname):
159 with open(fname,"wb") as f:
162 def IncrRefInFile(fname):
163 if fname in DicoForProxyFile:
164 DicoForProxyFile[fname] += 1
166 DicoForProxyFile[fname] = 2
169 def DecrRefInFile(fname):
170 if fname not in DicoForProxyFile:
173 cnt = DicoForProxyFile[fname]
174 DicoForProxyFile[fname] -= 1
176 del DicoForProxyFile[fname]
178 if os.path.exists(fname):
182 def GetBigObjectOnDiskThreshold():
183 return KernelBasis.GetBigObjOnDiskThreshold()
185 def ActivateProxyMecanismOrNot( sizeInByte ):
186 thres = GetBigObjectOnDiskThreshold()
190 return sizeInByte > thres
192 def GetBigObjectDirectory():
194 if not KernelBasis.BigObjOnDiskDirectoryDefined():
195 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
196 return os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
198 def GetBigObjectFileName():
200 Return a filename in the most secure manner (see tempfile documentation)
203 with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
207 class BigObjectOnDiskBase:
208 def __init__(self, fileName, objSerialized):
210 :param fileName: the file used to dump into.
211 :param objSerialized: the object in pickeled form
212 :type objSerialized: bytes
214 self._filename = fileName
215 # attribute _destroy is here to tell client side or server side
216 # only client side can be with _destroy set to True. server side due to risk of concurrency
217 # so pickled form of self must be done with this attribute set to False.
218 self._destroy = False
219 self.__dumpIntoFile(objSerialized)
221 def getDestroyStatus(self):
226 IncrRefInFile( self._filename )
228 # should never happen !
229 RuntimeError("Invalid call to incrRef !")
233 DecrRefInFile( self._filename )
235 # should never happen !
236 RuntimeError("Invalid call to decrRef !")
238 def unlinkOnDestructor(self):
241 def doNotTouchFile(self):
243 Method called slave side. The life cycle management of file is client side not slave side.
245 self._destroy = False
249 DecrRefInFile( self._filename )
251 def getFileName(self):
252 return self._filename
254 def __dumpIntoFile(self, objSerialized):
255 DumpInFile( objSerialized, self._filename )
257 def get(self, visitor = None):
258 obj = GetObjectFromFile( self._filename, visitor )
262 return float( self.get() )
265 return int( self.get() )
269 if isinstance(obj,str):
272 raise RuntimeError("Not a string")
274 class BigObjectOnDisk(BigObjectOnDiskBase):
275 def __init__(self, fileName, objSerialized):
276 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
278 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
279 def __init__(self, pos, length, fileName):
280 self._filename = fileName
281 self._destroy = False
283 self._length = length
285 def get(self, visitor = None):
286 fullObj = BigObjectOnDiskBase.get(self, visitor)
287 return fullObj[ self._pos ]
289 def __getitem__(self, i):
293 return len(self.get())
295 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
296 def __init__(self, length, fileName, objSerialized):
297 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
298 self._length = length
300 def __getitem__(self, i):
301 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
306 class BigObjectOnDiskList(BigObjectOnDiskSequence):
307 def __init__(self, length, fileName, objSerialized):
308 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
310 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
311 def __init__(self, length, fileName, objSerialized):
312 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
314 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
316 This method return a proxy instance of pickled form of object given in input.
320 obj (pickelable type) : object to be proxified
321 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
325 BigObjectOnDiskBase: proxy instance
327 pickleObj = pickleObjInit
328 if pickleObj is None:
329 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
330 fileName = GetBigObjectFileName()
332 visitor.setHDDMem( len(pickleObj) )
333 visitor.setFileName(fileName)
334 if isinstance( obj, list):
335 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
336 elif isinstance( obj, tuple):
337 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
339 proxyObj = BigObjectOnDisk( fileName , pickleObj )
342 def SpoolPickleObject( obj, visitor = None ):
344 with InOutputObjVisitorCM(visitor) as v:
345 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
346 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
349 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
350 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
353 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
355 def UnProxyObjectSimple( obj, visitor = None ):
357 Method to be called in Remote mode. Alterate the obj _status attribute.
358 Because the slave process does not participate in the reference counting
362 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
365 with InOutputObjVisitorCM(visitor) as v:
366 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
367 if isinstance(obj,BigObjectOnDiskBase):
370 elif isinstance( obj, list):
373 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
378 def UnProxyObjectSimpleLocal( obj ):
380 Method to be called in Local mode. Do not alterate the PyObj counter
382 if isinstance(obj,BigObjectOnDiskBase):
384 elif isinstance( obj, list):
387 retObj.append( UnProxyObjectSimpleLocal(elt) )
393 def __init__(self, fileName):
394 self._filename = fileName
397 return self._filename
399 class FileDeleter(FileHolder):
400 def __init__(self, fileName):
401 super().__init__( fileName )
404 if os.path.exists( self._filename ):
405 os.unlink( self._filename )
407 class MonitoringInfo:
408 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
409 self._py_file_name = pyFileName
410 self._interval_in_ms = intervalInMs
411 self._out_file_name = outFileName
415 def pyFileName(self):
416 return self._py_file_name
423 def pid(self, value):
427 def outFileName(self):
428 return self._out_file_name
431 def intervalInMs(self):
432 return self._interval_in_ms
434 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
436 This method loops indefinitely every intervalInMs milliseconds to scan
437 number of inodes and size of content recursively included into the in input directory.
442 outFileName (str) : name of file inside the results will be written. If None a new file is generated
444 See also CPUMemoryMonitoring
448 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
452 # outFileNameSave stores the content of outFileName during phase of dumping
453 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
454 outFileNameSave = f.name
455 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
457 tempOutFile = outFileName
458 if tempOutFile is None:
459 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
460 with open(tempPyFile,"w") as f:
462 import subprocess as sp
467 with open("{tempOutFile}","a") as f:
468 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
469 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
473 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
478 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
479 szOfDirStr = re.split("[\s]+",st)[0]
482 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
483 f.write( "{{}}\\n".format( str( nbinodes ) ) )
484 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
486 time.sleep( {intervalInMs} / 1000.0 )
487 """.format( **locals()))
488 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
489 pyFileName = FileDeleter( tempPyFile )
490 if outFileName is None:
491 outFileName = FileDeleter( tempOutFile )
493 outFileName = FileHolder(outFileName)
494 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
496 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
498 Launch a subprocess monitoring self process.
499 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
500 CPU usage and RSS memory of the calling process.
501 Communication between subprocess and self is done by file.
505 outFileName (str) : name of file inside the results will be written. If None a new file is generated
507 See also FileSystemMonitoring
510 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
513 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
515 tempOutFile = outFileName
516 if tempOutFile is None:
517 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
519 with open(tempPyFile,"w") as f:
520 f.write("""import psutil
522 process = psutil.Process( pid )
523 def getChargeOf( p ):
524 a,b = p.cpu_percent(), p.memory_info().rss
526 for c in p.children():
527 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
532 with open("{}","a") as f:
533 f.write( "{{}}\\n".format( "{}" ) )
535 cpu,mem_rss = getChargeOf( process )
536 f.write( "{{}}\\n".format( str( cpu ) ) )
537 f.write( "{{}}\\n".format( str( mem_rss ) ) )
539 time.sleep( {} / 1000.0 )
540 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
541 if outFileName is None:
542 autoOutFile = FileDeleter(tempOutFile)
544 autoOutFile = FileHolder(tempOutFile)
545 return FileDeleter(tempPyFile),autoOutFile
546 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
547 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
549 class GenericPythonMonitoringLauncherCtxMgr:
550 def __init__(self, monitoringParams):
554 monitoringParams (MonitoringInfo)
556 self._monitoring_params = monitoringParams
559 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
560 self._monitoring_params.pid = pid
561 return self._monitoring_params
563 def __exit__(self,exctype, exc, tb):
564 StopMonitoring( self._monitoring_params )
565 del self._monitoring_params
567 gc.collect() # force destruction of objects even in raise context
569 def StopMonitoring( monitoringInfo ):
571 Kill monitoring subprocess.
575 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
578 KernelBasis.StopMonitoring(monitoringInfo.pid)
581 def __init__(self, intervalInMs, cpu, mem_rss):
586 cpu (list<float>) CPU usage
587 mem_rss (list<int>) rss memory usage
589 self._interval_in_ms = intervalInMs
590 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
592 st = """Interval in ms : {self.intervalInMs}
594 """.format( **locals() )
597 def intervalInMs(self):
598 return self._interval_in_ms
602 list of triplets. First param of pair is cpu usage
603 Second param of pair is memory usage
607 def ReadCPUMemInfoInternal( fileName ):
609 cpu = [] ; mem_rss = []
610 if os.path.exists( fileName ):
612 with open(fileName, "r") as f:
613 coarseData = [ elt.strip() for elt in f.readlines() ]
614 intervalInMs = int( coarseData[0] )
615 coarseData = coarseData[1:]
616 cpu = [float(elt) for elt in coarseData[::2]]
617 mem_rss = [ int(elt) for elt in coarseData[1::2]]
620 return CPUMemInfo(intervalInMs,cpu,mem_rss)
622 def ReadCPUMemInfo( monitoringInfo ):
624 Retrieve CPU/Mem data of monitoring.
628 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
634 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
637 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
641 timeStamps (list<datetimestruct>)
643 volumeOfDir (list<str>)
645 self._dir_name_monitored = dirNameMonitored
646 self._interval_in_ms = intervalInMs
647 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
649 st = """Filename monitored : {self.dirNameMonitored}
650 Interval in ms : ${self.intervalInMs}
652 """.format( **locals() )
655 def dirNameMonitored(self):
656 return self._dir_name_monitored
658 def intervalInMs(self):
659 return self._interval_in_ms
663 list of triplets. First param of triplet is datetimestruct
664 Second param of triplet is #inodes.
665 Thirst param of triplet is size.
669 def ReadInodeSizeInfoInternal( fileName ):
672 with open(fileName, "r") as f:
673 coarseData = [ elt.strip() for elt in f.readlines() ]
674 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
675 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
676 nbInodes = [int(elt) for elt in coarseData[1::3]]
677 volumeOfDir = coarseData[2::3]
678 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
680 def ReadInodeSizeInfo( monitoringInfo ):
682 Retrieve nb of inodes and size of monitoring
686 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
692 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
694 class SeqByteReceiver:
695 # 2GB limit to trigger split into chunks
696 CHUNK_SIZE = 2000000000
697 def __init__(self,sender):
700 self._obj.UnRegister()
703 size = self._obj.getSize()
704 if size <= SeqByteReceiver.CHUNK_SIZE:
705 return self.fetchOneShot( size )
707 return self.fetchByChunks( size )
708 def fetchOneShot(self,size):
709 return self._obj.sendPart(0,size)
710 def fetchByChunks(self,size):
712 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
714 data_for_split_case = bytes(0)
715 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
716 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
717 while iStart!=iEnd and iEnd <= size:
718 part = self._obj.sendPart(iStart,iEnd)
719 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
720 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
721 return data_for_split_case
723 FinalCode = """import pickle
724 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
727 orb = CORBA.ORB_init([''])
730 outputFileName = "{}"
732 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
733 with open(inputFileName,"rb") as f:
734 context = pickle.load( f )
735 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
736 with open(codeFileName,"r") as f:
742 exec( code , context )
743 # filter part of context to be exported to father process
744 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
746 with open(outputFileName,"wb") as f:
747 pickle.dump( context, f )
750 class PythonFunctionEvaluatorParams:
751 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
752 self._main_filename = mainFileName
753 self._code_filename = codeFileName
754 self._in_context_filename = inContextFileName
755 self._out_context_filename = outContextFileName
759 with open(self._out_context_filename,"rb") as f:
760 return pickle.load( f )
761 def destroyOnOK(self):
762 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
763 if os.path.exists( fileToDestroy ):
764 os.unlink( fileToDestroy )
765 def destroyOnKO(self, containerRef):
767 Called in the context of failure with replay mode activated
769 for fileToDestroy in [self._out_context_filename]:
770 if os.path.exists( fileToDestroy ):
771 os.unlink( fileToDestroy )
772 # register to container files group associated to the
773 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
776 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
779 def cleanOperations(self):
781 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
783 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
785 return f"return with non zero code ({returnCode})"
788 if keepFilesToReplay:
789 return f"""return with non zero code ({returnCode})
791 Looks like a hard crash as returnCode {returnCode} != 0
793 {self.cleanOperations}
797 return f"""return with non zero code ({returnCode})
799 Looks like a hard crash as returnCode {returnCode} != 0
803 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
805 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
810 code (str) : python code to be executed using context
811 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
812 outargsname (list<str>) : list of arguments to be exported
813 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
814 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
815 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
816 closeEyesOnErrorAtExit (bool) : if True in case of crash of subprocess, if MY_KEY_TO_DETECT_FINISH is displayed at the end of stdout
821 ScriptExecInfo : instance serverside
826 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
830 import subprocess as sp
833 def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
834 def StdErrTreatment(closeEyesOnErrorAtExit , stderr):
835 if not closeEyesOnErrorAtExit:
838 return stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
840 return True,StdErrTreatment(closeEyesOnErrorAtExit , stderr)
841 if not closeEyesOnErrorAtExit:
843 if stderr[-len(MY_KEY_TO_DETECT_FINISH):] == MY_KEY_TO_DETECT_FINISH:
844 return True,stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
849 def InternalExecResistant( code, context, outargsname):
851 orb = CORBA.ORB_init([''])
852 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
854 EXEC_CODE_FNAME_PXF = "execsafe_"
855 def RetrieveUniquePartFromPfx( fname ):
856 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
857 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
858 codeFd.write( "{}\n".format( containerRef.get_startup_code() ) )
860 if closeEyesOnErrorAtExit:
863 sys.stderr.write({!r})
864 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
866 codeFileName = os.path.basename( codeFd.name )
867 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
868 with open(contextFileName,"wb") as contextFd:
869 pickle.dump( context, contextFd)
870 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
871 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
872 with open(mainExecFileName,"w") as f:
873 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
874 for iTry in range( KernelBasis.GetNumberOfRetry() ):
876 print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
877 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
878 stdout, stderr = p.communicate()
879 returnCode = p.returncode
882 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
883 ret = instanceOfLogOfCurrentSession._current_instance
884 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
885 stdout = stdout.decode()
886 stderr = stderr.decode()
887 sys.stdout.write( stdout ) ; sys.stdout.flush()
888 isOK, stderr = IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr )
889 sys.stderr.write( stderr ) ; sys.stderr.flush()
891 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
892 if len(pcklData) > 0:
893 ret = pickle.loads( pcklData )
894 context.update( evParams.result )
895 evParams.destroyOnOK()
897 print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
900 if keepFilesToReplay:
901 evParams.destroyOnKO( containerRef )
903 evParams.destroyOnOK()
904 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
906 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
907 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
909 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
910 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
912 def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
913 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
915 def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
916 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
918 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
919 exec( code, context )
920 return instanceOfLogOfCurrentSession._current_instance
922 class LogOfCurrentExecutionSessionAbs(abc.ABC):
924 self._current_instance = ScriptExecInfo()
926 def addInfoOnLevel2(self, key, value):
927 setattr(self._current_instance,key,value)
930 def addFreestyleAndFlush(self, value):
931 raise RuntimeError("Must be overloaded")
933 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
934 def __init__(self, handleToCentralizedInst):
936 self._remote_handle = handleToCentralizedInst
938 def addFreestyleAndFlush(self, value):
939 self._current_instance.freestyle = value
940 self.finalizeAndPushToMaster()
942 def finalizeAndPushToMaster(self):
944 Voluntary do nothing in case of problem to avoid to trouble execution
947 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
951 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
953 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
955 def __init__(self, handleToCentralizedInst = None):
957 def addFreestyleAndFlush(self, value):
960 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
961 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
962 def __init__(self, nodeName, code, poa, my_container, logscript):
963 """Initialize the node : compilation in the local context"""
964 Generic.__init__(self,poa)
965 self.nodeName=nodeName
967 self.my_container_py = my_container
968 self.my_container=my_container._container
969 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
970 self.ccode=compile(code,nodeName,'exec')
972 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
973 self._log_script = logscript
974 self._current_execution_session = None
975 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
978 def executeNow(self, outargsname):
979 raise RuntimeError("Must be overloaded")
982 # force removal of self.context. Don t know why it s not done by default
983 self.removeAllVarsInContext()
986 def getContainer(self):
987 return self.my_container
995 def defineNewCustomVar(self,varName,valueOfVar):
996 self.context[varName] = pickle.loads(valueOfVar)
999 def executeAnotherPieceOfCode(self,code):
1000 """Called for initialization of container lodging self."""
1002 ccode=compile(code,self.nodeName,'exec')
1003 exec(ccode, self.context)
1005 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
1007 def assignNewCompiledCode(self,codeStr):
1010 self.ccode=compile(codeStr,self.nodeName,'exec')
1012 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
1014 def executeSimple(self, key, val):
1016 Same as execute method except that no pickelization mecanism is implied here. No output is expected
1019 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
1020 exec(self.ccode,self.context)
1022 exc_typ,exc_val,exc_fr=sys.exc_info()
1023 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1024 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1025 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
1027 def execute(self,outargsname,argsin):
1028 """Execute the script stored in attribute ccode with pickled args (argsin)"""
1030 argsname,kws=pickle.loads(argsin)
1031 self.context.update(kws)
1032 exec(self.ccode, self.context)
1034 for arg in outargsname:
1035 if arg not in self.context:
1036 raise KeyError("There is no variable %s in context" % arg)
1037 argsout.append(self.context[arg])
1038 argsout=pickle.dumps(tuple(argsout),-1)
1041 exc_typ,exc_val,exc_fr=sys.exc_info()
1042 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1043 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1044 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1046 def executeFirst(self,argsin):
1047 """ Same than first part of self.execute to reduce memory peak."""
1048 def ArgInMananger(self,argsin):
1049 argsInPy = SeqByteReceiver( argsin )
1050 data = argsInPy.data()
1051 self.addInfoOnLevel2("inputMem",len(data))
1052 _,kws=pickle.loads(data)
1055 self.beginOfCurrentExecutionSession()
1056 self.addTimeInfoOnLevel2("startInputTime")
1057 # to force call of SeqByteReceiver's destructor
1058 kws = ArgInMananger(self,argsin)
1059 vis = InOutputObjVisitor()
1061 # fetch real data if necessary
1062 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1063 self.addInfoOnLevel2("inputHDDMem",vis)
1064 self.context.update(kws)
1065 self.addTimeInfoOnLevel2("endInputTime")
1067 exc_typ,exc_val,exc_fr=sys.exc_info()
1068 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1069 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1070 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1072 def executeSecond(self,outargsname):
1073 """ Same than second part of self.execute to reduce memory peak."""
1074 def executeSecondInternal(monitoringtimeresms):
1075 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( monitoringtimeresms ) ) as monitoringParams:
1076 currentInstance = self.executeNow( outargsname )
1077 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1078 return cpumeminfo, currentInstance
1082 self.addTimeInfoOnLevel2("startExecTime")
1084 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1085 cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
1087 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1088 self.addTimeInfoOnLevel2("endExecTime")
1089 self.addTimeInfoOnLevel2("startOutputTime")
1091 for arg in outargsname:
1092 if arg not in self.context:
1093 raise KeyError("There is no variable %s in context" % arg)
1094 argsout.append(self.context[arg])
1097 vis = InOutputObjVisitor()
1099 # the proxy mecanism is catched here
1100 argPickle = SpoolPickleObject( arg, vis )
1101 retArg = SenderByte_i( self.poa,argPickle )
1102 id_o = self.poa.activate_object(retArg)
1103 retObj = self.poa.id_to_reference(id_o)
1104 ret.append( retObj._narrow( SALOME.SenderByte ) )
1105 outputMem += len(argPickle)
1106 self.addInfoOnLevel2("outputMem",outputMem)
1107 self.addInfoOnLevel2("outputHDDMem",vis)
1108 self.addTimeInfoOnLevel2("endOutputTime")
1109 self.endOfCurrentExecutionSession()
1112 exc_typ,exc_val,exc_fr=sys.exc_info()
1113 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1114 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1115 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1117 def listAllVarsInContext(self):
1119 pat = re.compile("^__([a-z]+)__$")
1120 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1122 def removeAllVarsInContext(self):
1123 for elt in self.listAllVarsInContext():
1124 del self.context[elt]
1126 def getValueOfVarInContext(self,varName):
1128 return pickle.dumps(self.context[varName],-1)
1130 exc_typ,exc_val,exc_fr=sys.exc_info()
1131 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1132 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1135 def assignVarInContext(self, varName, value):
1137 self.context[varName][0] = pickle.loads(value)
1139 exc_typ,exc_val,exc_fr=sys.exc_info()
1140 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1141 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1144 def callMethodOnVarInContext(self, varName, methodName, args):
1146 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1148 exc_typ,exc_val,exc_fr=sys.exc_info()
1149 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1150 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1153 def beginOfCurrentExecutionSession(self):
1154 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1155 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1157 def endOfCurrentExecutionSession(self):
1158 self._current_execution_session.finalizeAndPushToMaster()
1159 self._current_execution_session = None
1161 def addInfoOnLevel2(self, key, value):
1162 self._current_execution_session.addInfoOnLevel2(key, value)
1164 def addTimeInfoOnLevel2(self, key):
1165 from datetime import datetime
1166 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1168 class PyScriptNode_i(PyScriptNode_Abstract_i):
1169 def __init__(self, nodeName, code, poa, my_container, logscript):
1170 super().__init__(nodeName, code, poa, my_container, logscript)
1172 def executeNow(self, outargsname):
1173 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1175 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1176 def __init__(self, nodeName, code, poa, my_container, logscript):
1177 super().__init__(nodeName, code, poa, my_container, logscript)
1179 def executeNow(self, outargsname):
1180 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1182 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1183 def __init__(self, nodeName, code, poa, my_container, logscript):
1184 super().__init__(nodeName, code, poa, my_container, logscript)
1186 def executeNow(self, outargsname):
1187 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1189 class PyScriptNode_OutOfProcess_FT_i(PyScriptNode_Abstract_i):
1190 def __init__(self, nodeName, code, poa, my_container, logscript):
1191 super().__init__(nodeName, code, poa, my_container, logscript)
1193 def executeNow(self, outargsname):
1194 return ExecCrashProofWithoutReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1196 class PyScriptNode_OutOfProcess_Replay_FT_i(PyScriptNode_Abstract_i):
1197 def __init__(self, nodeName, code, poa, my_container, logscript):
1198 super().__init__(nodeName, code, poa, my_container, logscript)
1200 def executeNow(self, outargsname):
1201 return ExecCrashProofWithReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)