1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
36 from SALOME_ContainerHelper import ScriptExecInfo
38 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
40 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
42 class Generic(SALOME__POA.GenericObj):
43 """A Python implementation of the GenericObj CORBA IDL"""
44 def __init__(self,poa):
49 #print("Register called : %d"%self.cnt)
53 #print("UnRegister called : %d"%self.cnt)
56 oid=self.poa.servant_to_id(self)
57 self.poa.deactivate_object(oid)
60 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
64 #print("Destuctor called")
67 class PyNode_i (Engines__POA.PyNode,Generic):
68 """The implementation of the PyNode CORBA IDL"""
69 def __init__(self, nodeName,code,poa,my_container):
70 """Initialize the node : compilation in the local context"""
71 Generic.__init__(self,poa)
72 self.nodeName=nodeName
74 self.my_container=my_container._container
75 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
76 ccode=compile(code,nodeName,'exec')
78 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
79 exec(ccode, self.context)
81 def getContainer(self):
82 return self.my_container
90 def defineNewCustomVar(self,varName,valueOfVar):
91 self.context[varName] = pickle.loads(valueOfVar)
94 def executeAnotherPieceOfCode(self,code):
95 """Called for initialization of container lodging self."""
97 ccode=compile(code,self.nodeName,'exec')
98 exec(ccode, self.context)
100 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
102 def execute(self,funcName,argsin):
103 """Execute the function funcName found in local context with pickled args (argsin)"""
105 argsin,kws=pickle.loads(argsin)
106 func=self.context[funcName]
107 argsout=func(*argsin,**kws)
108 argsout=pickle.dumps(argsout,-1)
111 exc_typ,exc_val,exc_fr=sys.exc_info()
112 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
113 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
115 class SenderByte_i(SALOME__POA.SenderByte,Generic):
116 def __init__(self,poa,bytesToSend):
117 Generic.__init__(self,poa)
118 self.bytesToSend = bytesToSend
121 return len(self.bytesToSend)
123 def sendPart(self,n1,n2):
124 return self.bytesToSend[n1:n2]
126 SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR"
128 SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES"
131 SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000
133 DicoForProxyFile = { }
135 def GetSizeOfBufferedReader(f):
137 This method returns in bytes size of a file openned.
141 f (io.IOBase): buffered reader returned by open
149 f.seek(0,io.SEEK_END)
151 f.seek(pos,io.SEEK_SET)
154 def GetObjectFromFile(fname, visitor = None):
155 with open(fname,"rb") as f:
157 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
158 visitor.setFileName( fname )
162 def DumpInFile(obj,fname):
163 with open(fname,"wb") as f:
166 def IncrRefInFile(fname):
167 if fname in DicoForProxyFile:
168 DicoForProxyFile[fname] += 1
170 DicoForProxyFile[fname] = 2
173 def DecrRefInFile(fname):
174 if fname not in DicoForProxyFile:
177 cnt = DicoForProxyFile[fname]
178 DicoForProxyFile[fname] -= 1
180 del DicoForProxyFile[fname]
182 if os.path.exists(fname):
186 def GetBigObjectOnDiskThreshold():
188 if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ:
189 return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] )
191 return SALOME_BIG_OBJ_ON_DISK_THRES_DFT
193 def ActivateProxyMecanismOrNot( sizeInByte ):
194 thres = GetBigObjectOnDiskThreshold()
198 return sizeInByte > thres
200 def GetBigObjectDirectory():
202 if SALOME_FILE_BIG_OBJ_DIR not in os.environ:
203 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
204 return os.path.expanduser( os.path.expandvars( os.environ[SALOME_FILE_BIG_OBJ_DIR] ) )
206 def GetBigObjectFileName():
208 Return a filename in the most secure manner (see tempfile documentation)
211 with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
215 class BigObjectOnDiskBase:
216 def __init__(self, fileName, objSerialized):
218 :param fileName: the file used to dump into.
219 :param objSerialized: the object in pickeled form
220 :type objSerialized: bytes
222 self._filename = fileName
223 # attribute _destroy is here to tell client side or server side
224 # only client side can be with _destroy set to True. server side due to risk of concurrency
225 # so pickled form of self must be done with this attribute set to False.
226 self._destroy = False
227 self.__dumpIntoFile(objSerialized)
229 def getDestroyStatus(self):
234 IncrRefInFile( self._filename )
236 # should never happen !
237 RuntimeError("Invalid call to incrRef !")
241 DecrRefInFile( self._filename )
243 # should never happen !
244 RuntimeError("Invalid call to decrRef !")
246 def unlinkOnDestructor(self):
249 def doNotTouchFile(self):
251 Method called slave side. The life cycle management of file is client side not slave side.
253 self._destroy = False
257 DecrRefInFile( self._filename )
259 def getFileName(self):
260 return self._filename
262 def __dumpIntoFile(self, objSerialized):
263 DumpInFile( objSerialized, self._filename )
265 def get(self, visitor = None):
266 obj = GetObjectFromFile( self._filename, visitor )
270 return float( self.get() )
273 return int( self.get() )
277 if isinstance(obj,str):
280 raise RuntimeError("Not a string")
282 class BigObjectOnDisk(BigObjectOnDiskBase):
283 def __init__(self, fileName, objSerialized):
284 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
286 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
287 def __init__(self, pos, length, fileName):
288 self._filename = fileName
289 self._destroy = False
291 self._length = length
293 def get(self, visitor = None):
294 fullObj = BigObjectOnDiskBase.get(self, visitor)
295 return fullObj[ self._pos ]
297 def __getitem__(self, i):
301 return len(self.get())
303 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
304 def __init__(self, length, fileName, objSerialized):
305 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
306 self._length = length
308 def __getitem__(self, i):
309 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
314 class BigObjectOnDiskList(BigObjectOnDiskSequence):
315 def __init__(self, length, fileName, objSerialized):
316 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
318 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
319 def __init__(self, length, fileName, objSerialized):
320 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
322 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
324 This method return a proxy instance of pickled form of object given in input.
328 obj (pickelable type) : object to be proxified
329 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
333 BigObjectOnDiskBase: proxy instance
335 pickleObj = pickleObjInit
336 if pickleObj is None:
337 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
338 fileName = GetBigObjectFileName()
340 visitor.setHDDMem( len(pickleObj) )
341 visitor.setFileName(fileName)
342 if isinstance( obj, list):
343 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
344 elif isinstance( obj, tuple):
345 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
347 proxyObj = BigObjectOnDisk( fileName , pickleObj )
350 def SpoolPickleObject( obj, visitor = None ):
352 with InOutputObjVisitorCM(visitor) as v:
353 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
354 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
357 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
358 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
361 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
363 def UnProxyObjectSimple( obj, visitor = None ):
365 Method to be called in Remote mode. Alterate the obj _status attribute.
366 Because the slave process does not participate in the reference counting
370 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
373 with InOutputObjVisitorCM(visitor) as v:
374 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
375 if isinstance(obj,BigObjectOnDiskBase):
378 elif isinstance( obj, list):
381 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
386 def UnProxyObjectSimpleLocal( obj ):
388 Method to be called in Local mode. Do not alterate the PyObj counter
390 if isinstance(obj,BigObjectOnDiskBase):
392 elif isinstance( obj, list):
395 retObj.append( UnProxyObjectSimpleLocal(elt) )
401 def __init__(self, fileName):
402 self._filename = fileName
405 return self._filename
407 class FileDeleter(FileHolder):
408 def __init__(self, fileName):
409 super().__init__( fileName )
412 if os.path.exists( self._filename ):
413 os.unlink( self._filename )
415 class MonitoringInfo:
416 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
417 self._py_file_name = pyFileName
418 self._interval_in_ms = intervalInMs
419 self._out_file_name = outFileName
423 def pyFileName(self):
424 return self._py_file_name
431 def pid(self, value):
435 def outFileName(self):
436 return self._out_file_name
439 def intervalInMs(self):
440 return self._interval_in_ms
442 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
444 This method loops indefinitely every intervalInMs milliseconds to scan
445 number of inodes and size of content recursively included into the in input directory.
450 outFileName (str) : name of file inside the results will be written. If None a new file is generated
452 See also CPUMemoryMonitoring
456 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
460 # outFileNameSave stores the content of outFileName during phase of dumping
461 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
462 outFileNameSave = f.name
463 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
465 tempOutFile = outFileName
466 if tempOutFile is None:
467 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
468 with open(tempPyFile,"w") as f:
470 import subprocess as sp
475 with open("{tempOutFile}","a") as f:
476 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
477 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
481 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
486 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
487 szOfDirStr = re.split("[\s]+",st)[0]
490 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
491 f.write( "{{}}\\n".format( str( nbinodes ) ) )
492 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
494 time.sleep( {intervalInMs} / 1000.0 )
495 """.format( **locals()))
496 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
497 pyFileName = FileDeleter( tempPyFile )
498 if outFileName is None:
499 outFileName = FileDeleter( tempOutFile )
501 outFileName = FileHolder(outFileName)
502 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
504 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
506 Launch a subprocess monitoring self process.
507 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
508 CPU usage and RSS memory of the calling process.
509 Communication between subprocess and self is done by file.
513 outFileName (str) : name of file inside the results will be written. If None a new file is generated
515 See also FileSystemMonitoring
518 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
521 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
523 tempOutFile = outFileName
524 if tempOutFile is None:
525 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
527 with open(tempPyFile,"w") as f:
528 f.write("""import psutil
530 process = psutil.Process( pid )
532 with open("{}","a") as f:
533 f.write( "{{}}\\n".format( "{}" ) )
535 f.write( "{{}}\\n".format( str( process.cpu_percent() ) ) )
536 f.write( "{{}}\\n".format( str( process.memory_info().rss ) ) )
538 time.sleep( {} / 1000.0 )
539 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
540 if outFileName is None:
541 autoOutFile = FileDeleter(tempOutFile)
543 autoOutFile = FileHolder(tempOutFile)
544 return FileDeleter(tempPyFile),autoOutFile
545 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
546 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
548 class GenericPythonMonitoringLauncherCtxMgr:
549 def __init__(self, monitoringParams):
553 monitoringParams (MonitoringInfo)
555 self._monitoring_params = monitoringParams
558 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
559 self._monitoring_params.pid = pid
560 return self._monitoring_params
562 def __exit__(self,exctype, exc, tb):
563 StopMonitoring( self._monitoring_params )
565 def StopMonitoring( monitoringInfo ):
567 Kill monitoring subprocess.
571 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
574 KernelBasis.StopMonitoring(monitoringInfo.pid)
577 def __init__(self, intervalInMs, cpu, mem_rss):
582 cpu (list<float>) CPU usage
583 mem_rss (list<int>) rss memory usage
585 self._interval_in_ms = intervalInMs
586 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
588 st = """Interval in ms : {self.intervalInMs}
590 """.format( **locals() )
593 def intervalInMs(self):
594 return self._interval_in_ms
598 list of triplets. First param of pair is cpu usage
599 Second param of pair is memory usage
603 def ReadCPUMemInfoInternal( fileName ):
605 cpu = [] ; mem_rss = []
606 if os.path.exists( fileName ):
608 with open(fileName, "r") as f:
609 coarseData = [ elt.strip() for elt in f.readlines() ]
610 intervalInMs = int( coarseData[0] )
611 coarseData = coarseData[1:]
612 cpu = [float(elt) for elt in coarseData[::2]]
613 mem_rss = [ int(elt) for elt in coarseData[1::2]]
616 return CPUMemInfo(intervalInMs,cpu,mem_rss)
618 def ReadCPUMemInfo( monitoringInfo ):
620 Retrieve CPU/Mem data of monitoring.
624 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
630 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
633 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
637 timeStamps (list<datetimestruct>)
639 volumeOfDir (list<str>)
641 self._dir_name_monitored = dirNameMonitored
642 self._interval_in_ms = intervalInMs
643 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
645 st = """Filename monitored : {self.dirNameMonitored}
646 Interval in ms : ${self.intervalInMs}
648 """.format( **locals() )
651 def dirNameMonitored(self):
652 return self._dir_name_monitored
654 def intervalInMs(self):
655 return self._interval_in_ms
659 list of triplets. First param of triplet is datetimestruct
660 Second param of triplet is #inodes.
661 Thirst param of triplet is size.
665 def ReadInodeSizeInfoInternal( fileName ):
668 with open(fileName, "r") as f:
669 coarseData = [ elt.strip() for elt in f.readlines() ]
670 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
671 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
672 nbInodes = [int(elt) for elt in coarseData[1::3]]
673 volumeOfDir = coarseData[2::3]
674 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
676 def ReadInodeSizeInfo( monitoringInfo ):
678 Retrieve nb of inodes and size of monitoring
682 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
688 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
690 class SeqByteReceiver:
691 # 2GB limit to trigger split into chunks
692 CHUNK_SIZE = 2000000000
693 def __init__(self,sender):
696 self._obj.UnRegister()
699 size = self._obj.getSize()
700 if size <= SeqByteReceiver.CHUNK_SIZE:
701 return self.fetchOneShot( size )
703 return self.fetchByChunks( size )
704 def fetchOneShot(self,size):
705 return self._obj.sendPart(0,size)
706 def fetchByChunks(self,size):
708 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
710 data_for_split_case = bytes(0)
711 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
712 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
713 while iStart!=iEnd and iEnd <= size:
714 part = self._obj.sendPart(iStart,iEnd)
715 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
716 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
717 return data_for_split_case
719 FinalCode = """import pickle
720 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
723 orb = CORBA.ORB_init([''])
726 outputFileName = "{}"
728 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
729 with open(inputFileName,"rb") as f:
730 context = pickle.load( f )
731 with open(codeFileName,"r") as f:
734 exec( code , context )
735 # filter part of context to be exported to father process
736 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
738 with open(outputFileName,"wb") as f:
739 pickle.dump( context, f )
742 class PythonFunctionEvaluatorParams:
743 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
744 self._main_filename = mainFileName
745 self._code_filename = codeFileName
746 self._in_context_filename = inContextFileName
747 self._out_context_filename = outContextFileName
751 with open(self._out_context_filename,"rb") as f:
752 return pickle.load( f )
753 def destroyOnOK(self):
754 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
755 if os.path.exists( fileToDestroy ):
756 os.unlink( fileToDestroy )
757 def destroyOnKO(self, containerRef):
759 Called in the context of failure with replay mode activated
761 for fileToDestroy in [self._out_context_filename]:
762 if os.path.exists( fileToDestroy ):
763 os.unlink( fileToDestroy )
764 # register to container files group associated to the
765 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
768 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
771 def cleanOperations(self):
773 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
775 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
777 return f"return with non zero code ({returnCode})"
780 if keepFilesToReplay:
781 return f"""return with non zero code ({returnCode})
783 Looks like a hard crash as returnCode {returnCode} != 0
785 {self.cleanOperations}
789 return f"""return with non zero code ({returnCode})
791 Looks like a hard crash as returnCode {returnCode} != 0
795 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
797 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
802 code (str) : python code to be executed using context
803 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
804 outargsname (list<str>) : list of arguments to be exported
805 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
806 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
807 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
812 ScriptExecInfo : instance serverside
817 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
821 import subprocess as sp
824 def InternalExecResistant( code, context, outargsname):
825 orb = CORBA.ORB_init([''])
826 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
828 EXEC_CODE_FNAME_PXF = "execsafe_"
829 def RetrieveUniquePartFromPfx( fname ):
830 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
831 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
834 codeFileName = os.path.basename( codeFd.name )
835 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
836 with open(contextFileName,"wb") as contextFd:
837 pickle.dump( context, contextFd)
838 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
839 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
840 with open(mainExecFileName,"w") as f:
841 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
842 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
843 stdout, stderr = p.communicate()
844 returnCode = p.returncode
845 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
846 ret = instanceOfLogOfCurrentSession._current_instance
847 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
848 stdout = stdout.decode()
849 stderr = stderr.decode()
850 sys.stdout.write( stdout ) ; sys.stdout.flush()
851 sys.stderr.write( stderr ) ; sys.stderr.flush()
853 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
854 if len(pcklData) > 0:
855 ret = pickle.loads( pcklData )
856 context.update( evParams.result )
857 evParams.destroyOnOK()
860 if keepFilesToReplay:
861 evParams.destroyOnKO( containerRef )
863 evParams.destroyOnOK()
864 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
866 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
867 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
869 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
870 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
872 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
873 exec( code, context )
874 return instanceOfLogOfCurrentSession._current_instance
876 class LogOfCurrentExecutionSession:
877 def __init__(self, handleToCentralizedInst):
878 self._remote_handle = handleToCentralizedInst
879 self._current_instance = ScriptExecInfo()
881 def addFreestyleAndFlush(self, value):
882 self._current_instance.freestyle = value
883 self.finalizeAndPushToMaster()
885 def addInfoOnLevel2(self, key, value):
886 setattr(self._current_instance,key,value)
888 def finalizeAndPushToMaster(self):
889 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
891 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
892 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
893 def __init__(self, nodeName, code, poa, my_container, logscript):
894 """Initialize the node : compilation in the local context"""
895 Generic.__init__(self,poa)
896 self.nodeName=nodeName
898 self.my_container_py = my_container
899 self.my_container=my_container._container
900 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
901 self.ccode=compile(code,nodeName,'exec')
903 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
904 self._log_script = logscript
905 self._current_execution_session = None
906 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
909 def executeNow(self, outargsname):
910 raise RuntimeError("Must be overloaded")
913 # force removal of self.context. Don t know why it s not done by default
914 self.removeAllVarsInContext()
917 def getContainer(self):
918 return self.my_container
926 def defineNewCustomVar(self,varName,valueOfVar):
927 self.context[varName] = pickle.loads(valueOfVar)
930 def executeAnotherPieceOfCode(self,code):
931 """Called for initialization of container lodging self."""
933 ccode=compile(code,self.nodeName,'exec')
934 exec(ccode, self.context)
936 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
938 def assignNewCompiledCode(self,codeStr):
941 self.ccode=compile(codeStr,self.nodeName,'exec')
943 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
945 def executeSimple(self, key, val):
947 Same as execute method except that no pickelization mecanism is implied here. No output is expected
950 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
951 exec(self.ccode,self.context)
953 exc_typ,exc_val,exc_fr=sys.exc_info()
954 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
955 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
956 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
958 def execute(self,outargsname,argsin):
959 """Execute the script stored in attribute ccode with pickled args (argsin)"""
961 argsname,kws=pickle.loads(argsin)
962 self.context.update(kws)
963 exec(self.ccode, self.context)
965 for arg in outargsname:
966 if arg not in self.context:
967 raise KeyError("There is no variable %s in context" % arg)
968 argsout.append(self.context[arg])
969 argsout=pickle.dumps(tuple(argsout),-1)
972 exc_typ,exc_val,exc_fr=sys.exc_info()
973 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
974 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
975 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
977 def executeFirst(self,argsin):
978 """ Same than first part of self.execute to reduce memory peak."""
979 def ArgInMananger(self,argsin):
980 argsInPy = SeqByteReceiver( argsin )
981 data = argsInPy.data()
982 self.addInfoOnLevel2("inputMem",len(data))
983 _,kws=pickle.loads(data)
986 self.beginOfCurrentExecutionSession()
987 self.addTimeInfoOnLevel2("startInputTime")
988 # to force call of SeqByteReceiver's destructor
989 kws = ArgInMananger(self,argsin)
990 vis = InOutputObjVisitor()
992 # fetch real data if necessary
993 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
994 self.addInfoOnLevel2("inputHDDMem",vis)
995 self.context.update(kws)
996 self.addTimeInfoOnLevel2("endInputTime")
998 exc_typ,exc_val,exc_fr=sys.exc_info()
999 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1000 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1001 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1003 def executeSecond(self,outargsname):
1004 """ Same than second part of self.execute to reduce memory peak."""
1007 self.addTimeInfoOnLevel2("startExecTime")
1009 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1010 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
1011 self._current_execution_session._current_instance = self.executeNow( outargsname )
1012 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1014 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1015 del monitoringParams
1016 self.addTimeInfoOnLevel2("endExecTime")
1017 self.addTimeInfoOnLevel2("startOutputTime")
1019 for arg in outargsname:
1020 if arg not in self.context:
1021 raise KeyError("There is no variable %s in context" % arg)
1022 argsout.append(self.context[arg])
1025 vis = InOutputObjVisitor()
1027 # the proxy mecanism is catched here
1028 argPickle = SpoolPickleObject( arg, vis )
1029 retArg = SenderByte_i( self.poa,argPickle )
1030 id_o = self.poa.activate_object(retArg)
1031 retObj = self.poa.id_to_reference(id_o)
1032 ret.append( retObj._narrow( SALOME.SenderByte ) )
1033 outputMem += len(argPickle)
1034 self.addInfoOnLevel2("outputMem",outputMem)
1035 self.addInfoOnLevel2("outputHDDMem",vis)
1036 self.addTimeInfoOnLevel2("endOutputTime")
1037 self.endOfCurrentExecutionSession()
1040 exc_typ,exc_val,exc_fr=sys.exc_info()
1041 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1042 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1043 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1045 def listAllVarsInContext(self):
1047 pat = re.compile("^__([a-z]+)__$")
1048 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1050 def removeAllVarsInContext(self):
1051 for elt in self.listAllVarsInContext():
1052 del self.context[elt]
1054 def getValueOfVarInContext(self,varName):
1056 return pickle.dumps(self.context[varName],-1)
1058 exc_typ,exc_val,exc_fr=sys.exc_info()
1059 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1060 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1063 def assignVarInContext(self, varName, value):
1065 self.context[varName][0] = pickle.loads(value)
1067 exc_typ,exc_val,exc_fr=sys.exc_info()
1068 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1069 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1072 def callMethodOnVarInContext(self, varName, methodName, args):
1074 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1076 exc_typ,exc_val,exc_fr=sys.exc_info()
1077 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1078 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1081 def beginOfCurrentExecutionSession(self):
1082 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1083 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1085 def endOfCurrentExecutionSession(self):
1086 self._current_execution_session.finalizeAndPushToMaster()
1087 self._current_execution_session = None
1089 def addInfoOnLevel2(self, key, value):
1090 self._current_execution_session.addInfoOnLevel2(key, value)
1092 def addTimeInfoOnLevel2(self, key):
1093 from datetime import datetime
1094 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1096 class PyScriptNode_i(PyScriptNode_Abstract_i):
1097 def __init__(self, nodeName, code, poa, my_container, logscript):
1098 super().__init__(nodeName, code, poa, my_container, logscript)
1100 def executeNow(self, outargsname):
1101 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1103 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1104 def __init__(self, nodeName, code, poa, my_container, logscript):
1105 super().__init__(nodeName, code, poa, my_container, logscript)
1107 def executeNow(self, outargsname):
1108 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1110 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1111 def __init__(self, nodeName, code, poa, my_container, logscript):
1112 super().__init__(nodeName, code, poa, my_container, logscript)
1114 def executeNow(self, outargsname):
1115 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)