1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
37 from SALOME_ContainerHelper import ScriptExecInfo
39 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
41 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
43 class Generic(SALOME__POA.GenericObj):
44 """A Python implementation of the GenericObj CORBA IDL"""
45 def __init__(self,poa):
50 #print("Register called : %d"%self.cnt)
54 #print("UnRegister called : %d"%self.cnt)
57 oid=self.poa.servant_to_id(self)
58 self.poa.deactivate_object(oid)
61 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
65 #print("Destuctor called")
68 class PyNode_i (Engines__POA.PyNode,Generic):
69 """The implementation of the PyNode CORBA IDL"""
70 def __init__(self, nodeName,code,poa,my_container):
71 """Initialize the node : compilation in the local context"""
72 Generic.__init__(self,poa)
73 self.nodeName=nodeName
75 self.my_container=my_container._container
76 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
77 ccode=compile(code,nodeName,'exec')
79 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
80 exec(ccode, self.context)
82 def getContainer(self):
83 return self.my_container
91 def defineNewCustomVar(self,varName,valueOfVar):
92 self.context[varName] = pickle.loads(valueOfVar)
95 def executeAnotherPieceOfCode(self,code):
96 """Called for initialization of container lodging self."""
98 ccode=compile(code,self.nodeName,'exec')
99 exec(ccode, self.context)
101 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
103 def execute(self,funcName,argsin):
104 """Execute the function funcName found in local context with pickled args (argsin)"""
106 argsin,kws=pickle.loads(argsin)
107 func=self.context[funcName]
108 argsout=func(*argsin,**kws)
109 argsout=pickle.dumps(argsout,-1)
112 exc_typ,exc_val,exc_fr=sys.exc_info()
113 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
114 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
116 class SenderByte_i(SALOME__POA.SenderByte,Generic):
117 def __init__(self,poa,bytesToSend):
118 Generic.__init__(self,poa)
119 self.bytesToSend = bytesToSend
122 return len(self.bytesToSend)
124 def sendPart(self,n1,n2):
125 return self.bytesToSend[n1:n2]
127 DicoForProxyFile = { }
129 def GetSizeOfBufferedReader(f):
131 This method returns in bytes size of a file openned.
135 f (io.IOBase): buffered reader returned by open
143 f.seek(0,io.SEEK_END)
145 f.seek(pos,io.SEEK_SET)
148 def GetObjectFromFile(fname, visitor = None):
149 with open(fname,"rb") as f:
151 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
152 visitor.setFileName( fname )
156 def DumpInFile(obj,fname):
157 with open(fname,"wb") as f:
160 def IncrRefInFile(fname):
161 if fname in DicoForProxyFile:
162 DicoForProxyFile[fname] += 1
164 DicoForProxyFile[fname] = 2
167 def DecrRefInFile(fname):
168 if fname not in DicoForProxyFile:
171 cnt = DicoForProxyFile[fname]
172 DicoForProxyFile[fname] -= 1
174 del DicoForProxyFile[fname]
176 if os.path.exists(fname):
180 def GetBigObjectOnDiskThreshold():
181 return KernelBasis.GetBigObjOnDiskThreshold()
183 def ActivateProxyMecanismOrNot( sizeInByte ):
184 thres = GetBigObjectOnDiskThreshold()
188 return sizeInByte > thres
190 def GetBigObjectDirectory():
192 if not KernelBasis.BigObjOnDiskDirectoryDefined():
193 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
194 return os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
196 def GetBigObjectFileName():
198 Return a filename in the most secure manner (see tempfile documentation)
201 with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
205 class BigObjectOnDiskBase:
206 def __init__(self, fileName, objSerialized):
208 :param fileName: the file used to dump into.
209 :param objSerialized: the object in pickeled form
210 :type objSerialized: bytes
212 self._filename = fileName
213 # attribute _destroy is here to tell client side or server side
214 # only client side can be with _destroy set to True. server side due to risk of concurrency
215 # so pickled form of self must be done with this attribute set to False.
216 self._destroy = False
217 self.__dumpIntoFile(objSerialized)
219 def getDestroyStatus(self):
224 IncrRefInFile( self._filename )
226 # should never happen !
227 RuntimeError("Invalid call to incrRef !")
231 DecrRefInFile( self._filename )
233 # should never happen !
234 RuntimeError("Invalid call to decrRef !")
236 def unlinkOnDestructor(self):
239 def doNotTouchFile(self):
241 Method called slave side. The life cycle management of file is client side not slave side.
243 self._destroy = False
247 DecrRefInFile( self._filename )
249 def getFileName(self):
250 return self._filename
252 def __dumpIntoFile(self, objSerialized):
253 DumpInFile( objSerialized, self._filename )
255 def get(self, visitor = None):
256 obj = GetObjectFromFile( self._filename, visitor )
260 return float( self.get() )
263 return int( self.get() )
267 if isinstance(obj,str):
270 raise RuntimeError("Not a string")
272 class BigObjectOnDisk(BigObjectOnDiskBase):
273 def __init__(self, fileName, objSerialized):
274 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
276 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
277 def __init__(self, pos, length, fileName):
278 self._filename = fileName
279 self._destroy = False
281 self._length = length
283 def get(self, visitor = None):
284 fullObj = BigObjectOnDiskBase.get(self, visitor)
285 return fullObj[ self._pos ]
287 def __getitem__(self, i):
291 return len(self.get())
293 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
294 def __init__(self, length, fileName, objSerialized):
295 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
296 self._length = length
298 def __getitem__(self, i):
299 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
304 class BigObjectOnDiskList(BigObjectOnDiskSequence):
305 def __init__(self, length, fileName, objSerialized):
306 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
308 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
309 def __init__(self, length, fileName, objSerialized):
310 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
312 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
314 This method return a proxy instance of pickled form of object given in input.
318 obj (pickelable type) : object to be proxified
319 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
323 BigObjectOnDiskBase: proxy instance
325 pickleObj = pickleObjInit
326 if pickleObj is None:
327 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
328 fileName = GetBigObjectFileName()
330 visitor.setHDDMem( len(pickleObj) )
331 visitor.setFileName(fileName)
332 if isinstance( obj, list):
333 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
334 elif isinstance( obj, tuple):
335 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
337 proxyObj = BigObjectOnDisk( fileName , pickleObj )
340 def SpoolPickleObject( obj, visitor = None ):
342 with InOutputObjVisitorCM(visitor) as v:
343 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
344 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
347 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
348 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
351 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
353 def UnProxyObjectSimple( obj, visitor = None ):
355 Method to be called in Remote mode. Alterate the obj _status attribute.
356 Because the slave process does not participate in the reference counting
360 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
363 with InOutputObjVisitorCM(visitor) as v:
364 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
365 if isinstance(obj,BigObjectOnDiskBase):
368 elif isinstance( obj, list):
371 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
376 def UnProxyObjectSimpleLocal( obj ):
378 Method to be called in Local mode. Do not alterate the PyObj counter
380 if isinstance(obj,BigObjectOnDiskBase):
382 elif isinstance( obj, list):
385 retObj.append( UnProxyObjectSimpleLocal(elt) )
391 def __init__(self, fileName):
392 self._filename = fileName
395 return self._filename
397 class FileDeleter(FileHolder):
398 def __init__(self, fileName):
399 super().__init__( fileName )
402 if os.path.exists( self._filename ):
403 os.unlink( self._filename )
405 class MonitoringInfo:
406 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
407 self._py_file_name = pyFileName
408 self._interval_in_ms = intervalInMs
409 self._out_file_name = outFileName
413 def pyFileName(self):
414 return self._py_file_name
421 def pid(self, value):
425 def outFileName(self):
426 return self._out_file_name
429 def intervalInMs(self):
430 return self._interval_in_ms
432 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
434 This method loops indefinitely every intervalInMs milliseconds to scan
435 number of inodes and size of content recursively included into the in input directory.
440 outFileName (str) : name of file inside the results will be written. If None a new file is generated
442 See also CPUMemoryMonitoring
446 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
450 # outFileNameSave stores the content of outFileName during phase of dumping
451 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
452 outFileNameSave = f.name
453 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
455 tempOutFile = outFileName
456 if tempOutFile is None:
457 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
458 with open(tempPyFile,"w") as f:
460 import subprocess as sp
465 with open("{tempOutFile}","a") as f:
466 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
467 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
471 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
476 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
477 szOfDirStr = re.split("[\s]+",st)[0]
480 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
481 f.write( "{{}}\\n".format( str( nbinodes ) ) )
482 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
484 time.sleep( {intervalInMs} / 1000.0 )
485 """.format( **locals()))
486 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
487 pyFileName = FileDeleter( tempPyFile )
488 if outFileName is None:
489 outFileName = FileDeleter( tempOutFile )
491 outFileName = FileHolder(outFileName)
492 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
494 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
496 Launch a subprocess monitoring self process.
497 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
498 CPU usage and RSS memory of the calling process.
499 Communication between subprocess and self is done by file.
503 outFileName (str) : name of file inside the results will be written. If None a new file is generated
505 See also FileSystemMonitoring
508 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
511 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
513 tempOutFile = outFileName
514 if tempOutFile is None:
515 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
517 with open(tempPyFile,"w") as f:
518 f.write("""import psutil
520 process = psutil.Process( pid )
521 def getChargeOf( p ):
522 a,b = p.cpu_percent(), p.memory_info().rss
524 for c in p.children():
525 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
530 with open("{}","a") as f:
531 f.write( "{{}}\\n".format( "{}" ) )
533 cpu,mem_rss = getChargeOf( process )
534 f.write( "{{}}\\n".format( str( cpu ) ) )
535 f.write( "{{}}\\n".format( str( mem_rss ) ) )
537 time.sleep( {} / 1000.0 )
538 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
539 if outFileName is None:
540 autoOutFile = FileDeleter(tempOutFile)
542 autoOutFile = FileHolder(tempOutFile)
543 return FileDeleter(tempPyFile),autoOutFile
544 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
545 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
547 class GenericPythonMonitoringLauncherCtxMgr:
548 def __init__(self, monitoringParams):
552 monitoringParams (MonitoringInfo)
554 self._monitoring_params = monitoringParams
557 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
558 self._monitoring_params.pid = pid
559 return self._monitoring_params
561 def __exit__(self,exctype, exc, tb):
562 StopMonitoring( self._monitoring_params )
564 def StopMonitoring( monitoringInfo ):
566 Kill monitoring subprocess.
570 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
573 KernelBasis.StopMonitoring(monitoringInfo.pid)
576 def __init__(self, intervalInMs, cpu, mem_rss):
581 cpu (list<float>) CPU usage
582 mem_rss (list<int>) rss memory usage
584 self._interval_in_ms = intervalInMs
585 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
587 st = """Interval in ms : {self.intervalInMs}
589 """.format( **locals() )
592 def intervalInMs(self):
593 return self._interval_in_ms
597 list of triplets. First param of pair is cpu usage
598 Second param of pair is memory usage
602 def ReadCPUMemInfoInternal( fileName ):
604 cpu = [] ; mem_rss = []
605 if os.path.exists( fileName ):
607 with open(fileName, "r") as f:
608 coarseData = [ elt.strip() for elt in f.readlines() ]
609 intervalInMs = int( coarseData[0] )
610 coarseData = coarseData[1:]
611 cpu = [float(elt) for elt in coarseData[::2]]
612 mem_rss = [ int(elt) for elt in coarseData[1::2]]
615 return CPUMemInfo(intervalInMs,cpu,mem_rss)
617 def ReadCPUMemInfo( monitoringInfo ):
619 Retrieve CPU/Mem data of monitoring.
623 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
629 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
632 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
636 timeStamps (list<datetimestruct>)
638 volumeOfDir (list<str>)
640 self._dir_name_monitored = dirNameMonitored
641 self._interval_in_ms = intervalInMs
642 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
644 st = """Filename monitored : {self.dirNameMonitored}
645 Interval in ms : ${self.intervalInMs}
647 """.format( **locals() )
650 def dirNameMonitored(self):
651 return self._dir_name_monitored
653 def intervalInMs(self):
654 return self._interval_in_ms
658 list of triplets. First param of triplet is datetimestruct
659 Second param of triplet is #inodes.
660 Thirst param of triplet is size.
664 def ReadInodeSizeInfoInternal( fileName ):
667 with open(fileName, "r") as f:
668 coarseData = [ elt.strip() for elt in f.readlines() ]
669 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
670 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
671 nbInodes = [int(elt) for elt in coarseData[1::3]]
672 volumeOfDir = coarseData[2::3]
673 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
675 def ReadInodeSizeInfo( monitoringInfo ):
677 Retrieve nb of inodes and size of monitoring
681 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
687 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
689 class SeqByteReceiver:
690 # 2GB limit to trigger split into chunks
691 CHUNK_SIZE = 2000000000
692 def __init__(self,sender):
695 self._obj.UnRegister()
698 size = self._obj.getSize()
699 if size <= SeqByteReceiver.CHUNK_SIZE:
700 return self.fetchOneShot( size )
702 return self.fetchByChunks( size )
703 def fetchOneShot(self,size):
704 return self._obj.sendPart(0,size)
705 def fetchByChunks(self,size):
707 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
709 data_for_split_case = bytes(0)
710 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
711 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
712 while iStart!=iEnd and iEnd <= size:
713 part = self._obj.sendPart(iStart,iEnd)
714 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
715 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
716 return data_for_split_case
718 FinalCode = """import pickle
719 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
722 orb = CORBA.ORB_init([''])
725 outputFileName = "{}"
727 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
728 with open(inputFileName,"rb") as f:
729 context = pickle.load( f )
730 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
731 with open(codeFileName,"r") as f:
734 exec( code , context )
735 # filter part of context to be exported to father process
736 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
738 with open(outputFileName,"wb") as f:
739 pickle.dump( context, f )
742 class PythonFunctionEvaluatorParams:
743 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
744 self._main_filename = mainFileName
745 self._code_filename = codeFileName
746 self._in_context_filename = inContextFileName
747 self._out_context_filename = outContextFileName
751 with open(self._out_context_filename,"rb") as f:
752 return pickle.load( f )
753 def destroyOnOK(self):
754 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
755 if os.path.exists( fileToDestroy ):
756 os.unlink( fileToDestroy )
757 def destroyOnKO(self, containerRef):
759 Called in the context of failure with replay mode activated
761 for fileToDestroy in [self._out_context_filename]:
762 if os.path.exists( fileToDestroy ):
763 os.unlink( fileToDestroy )
764 # register to container files group associated to the
765 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
768 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
771 def cleanOperations(self):
773 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
775 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
777 return f"return with non zero code ({returnCode})"
780 if keepFilesToReplay:
781 return f"""return with non zero code ({returnCode})
783 Looks like a hard crash as returnCode {returnCode} != 0
785 {self.cleanOperations}
789 return f"""return with non zero code ({returnCode})
791 Looks like a hard crash as returnCode {returnCode} != 0
795 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
797 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
802 code (str) : python code to be executed using context
803 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
804 outargsname (list<str>) : list of arguments to be exported
805 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
806 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
807 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
812 ScriptExecInfo : instance serverside
817 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
821 import subprocess as sp
824 def InternalExecResistant( code, context, outargsname):
825 orb = CORBA.ORB_init([''])
826 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
828 EXEC_CODE_FNAME_PXF = "execsafe_"
829 def RetrieveUniquePartFromPfx( fname ):
830 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
831 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
834 codeFileName = os.path.basename( codeFd.name )
835 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
836 with open(contextFileName,"wb") as contextFd:
837 pickle.dump( context, contextFd)
838 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
839 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
840 with open(mainExecFileName,"w") as f:
841 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
842 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
843 stdout, stderr = p.communicate()
844 returnCode = p.returncode
845 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
846 ret = instanceOfLogOfCurrentSession._current_instance
847 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
848 stdout = stdout.decode()
849 stderr = stderr.decode()
850 sys.stdout.write( stdout ) ; sys.stdout.flush()
851 sys.stderr.write( stderr ) ; sys.stderr.flush()
853 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
854 if len(pcklData) > 0:
855 ret = pickle.loads( pcklData )
856 context.update( evParams.result )
857 evParams.destroyOnOK()
860 if keepFilesToReplay:
861 evParams.destroyOnKO( containerRef )
863 evParams.destroyOnOK()
864 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
866 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
867 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
869 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
870 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
872 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
873 exec( code, context )
874 return instanceOfLogOfCurrentSession._current_instance
876 class LogOfCurrentExecutionSessionAbs(abc.ABC):
878 self._current_instance = ScriptExecInfo()
880 def addInfoOnLevel2(self, key, value):
881 setattr(self._current_instance,key,value)
884 def addFreestyleAndFlush(self, value):
885 raise RuntimeError("Must be overloaded")
887 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
888 def __init__(self, handleToCentralizedInst):
890 self._remote_handle = handleToCentralizedInst
892 def addFreestyleAndFlush(self, value):
893 self._current_instance.freestyle = value
894 self.finalizeAndPushToMaster()
896 def finalizeAndPushToMaster(self):
897 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
899 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
901 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
903 def __init__(self, handleToCentralizedInst = None):
905 def addFreestyleAndFlush(self, value):
908 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
909 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
910 def __init__(self, nodeName, code, poa, my_container, logscript):
911 """Initialize the node : compilation in the local context"""
912 Generic.__init__(self,poa)
913 self.nodeName=nodeName
915 self.my_container_py = my_container
916 self.my_container=my_container._container
917 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
918 self.ccode=compile(code,nodeName,'exec')
920 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
921 self._log_script = logscript
922 self._current_execution_session = None
923 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
926 def executeNow(self, outargsname):
927 raise RuntimeError("Must be overloaded")
930 # force removal of self.context. Don t know why it s not done by default
931 self.removeAllVarsInContext()
934 def getContainer(self):
935 return self.my_container
943 def defineNewCustomVar(self,varName,valueOfVar):
944 self.context[varName] = pickle.loads(valueOfVar)
947 def executeAnotherPieceOfCode(self,code):
948 """Called for initialization of container lodging self."""
950 ccode=compile(code,self.nodeName,'exec')
951 exec(ccode, self.context)
953 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
955 def assignNewCompiledCode(self,codeStr):
958 self.ccode=compile(codeStr,self.nodeName,'exec')
960 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
962 def executeSimple(self, key, val):
964 Same as execute method except that no pickelization mecanism is implied here. No output is expected
967 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
968 exec(self.ccode,self.context)
970 exc_typ,exc_val,exc_fr=sys.exc_info()
971 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
972 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
973 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
975 def execute(self,outargsname,argsin):
976 """Execute the script stored in attribute ccode with pickled args (argsin)"""
978 argsname,kws=pickle.loads(argsin)
979 self.context.update(kws)
980 exec(self.ccode, self.context)
982 for arg in outargsname:
983 if arg not in self.context:
984 raise KeyError("There is no variable %s in context" % arg)
985 argsout.append(self.context[arg])
986 argsout=pickle.dumps(tuple(argsout),-1)
989 exc_typ,exc_val,exc_fr=sys.exc_info()
990 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
991 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
992 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
994 def executeFirst(self,argsin):
995 """ Same than first part of self.execute to reduce memory peak."""
996 def ArgInMananger(self,argsin):
997 argsInPy = SeqByteReceiver( argsin )
998 data = argsInPy.data()
999 self.addInfoOnLevel2("inputMem",len(data))
1000 _,kws=pickle.loads(data)
1003 self.beginOfCurrentExecutionSession()
1004 self.addTimeInfoOnLevel2("startInputTime")
1005 # to force call of SeqByteReceiver's destructor
1006 kws = ArgInMananger(self,argsin)
1007 vis = InOutputObjVisitor()
1009 # fetch real data if necessary
1010 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1011 self.addInfoOnLevel2("inputHDDMem",vis)
1012 self.context.update(kws)
1013 self.addTimeInfoOnLevel2("endInputTime")
1015 exc_typ,exc_val,exc_fr=sys.exc_info()
1016 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1017 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1018 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1020 def executeSecond(self,outargsname):
1021 """ Same than second part of self.execute to reduce memory peak."""
1024 self.addTimeInfoOnLevel2("startExecTime")
1026 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1027 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
1028 self._current_execution_session._current_instance = self.executeNow( outargsname )
1029 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1031 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1032 del monitoringParams
1033 self.addTimeInfoOnLevel2("endExecTime")
1034 self.addTimeInfoOnLevel2("startOutputTime")
1036 for arg in outargsname:
1037 if arg not in self.context:
1038 raise KeyError("There is no variable %s in context" % arg)
1039 argsout.append(self.context[arg])
1042 vis = InOutputObjVisitor()
1044 # the proxy mecanism is catched here
1045 argPickle = SpoolPickleObject( arg, vis )
1046 retArg = SenderByte_i( self.poa,argPickle )
1047 id_o = self.poa.activate_object(retArg)
1048 retObj = self.poa.id_to_reference(id_o)
1049 ret.append( retObj._narrow( SALOME.SenderByte ) )
1050 outputMem += len(argPickle)
1051 self.addInfoOnLevel2("outputMem",outputMem)
1052 self.addInfoOnLevel2("outputHDDMem",vis)
1053 self.addTimeInfoOnLevel2("endOutputTime")
1054 self.endOfCurrentExecutionSession()
1057 exc_typ,exc_val,exc_fr=sys.exc_info()
1058 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1059 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1060 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1062 def listAllVarsInContext(self):
1064 pat = re.compile("^__([a-z]+)__$")
1065 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1067 def removeAllVarsInContext(self):
1068 for elt in self.listAllVarsInContext():
1069 del self.context[elt]
1071 def getValueOfVarInContext(self,varName):
1073 return pickle.dumps(self.context[varName],-1)
1075 exc_typ,exc_val,exc_fr=sys.exc_info()
1076 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1077 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1080 def assignVarInContext(self, varName, value):
1082 self.context[varName][0] = pickle.loads(value)
1084 exc_typ,exc_val,exc_fr=sys.exc_info()
1085 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1086 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1089 def callMethodOnVarInContext(self, varName, methodName, args):
1091 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1093 exc_typ,exc_val,exc_fr=sys.exc_info()
1094 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1095 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1098 def beginOfCurrentExecutionSession(self):
1099 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1100 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1102 def endOfCurrentExecutionSession(self):
1103 self._current_execution_session.finalizeAndPushToMaster()
1104 self._current_execution_session = None
1106 def addInfoOnLevel2(self, key, value):
1107 self._current_execution_session.addInfoOnLevel2(key, value)
1109 def addTimeInfoOnLevel2(self, key):
1110 from datetime import datetime
1111 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1113 class PyScriptNode_i(PyScriptNode_Abstract_i):
1114 def __init__(self, nodeName, code, poa, my_container, logscript):
1115 super().__init__(nodeName, code, poa, my_container, logscript)
1117 def executeNow(self, outargsname):
1118 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1120 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1121 def __init__(self, nodeName, code, poa, my_container, logscript):
1122 super().__init__(nodeName, code, poa, my_container, logscript)
1124 def executeNow(self, outargsname):
1125 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1127 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1128 def __init__(self, nodeName, code, poa, my_container, logscript):
1129 super().__init__(nodeName, code, poa, my_container, logscript)
1131 def executeNow(self, outargsname):
1132 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)