1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
36 from SALOME_ContainerHelper import ScriptExecInfo
38 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
40 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
42 class Generic(SALOME__POA.GenericObj):
43 """A Python implementation of the GenericObj CORBA IDL"""
44 def __init__(self,poa):
49 #print("Register called : %d"%self.cnt)
53 #print("UnRegister called : %d"%self.cnt)
56 oid=self.poa.servant_to_id(self)
57 self.poa.deactivate_object(oid)
60 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
64 #print("Destuctor called")
67 class PyNode_i (Engines__POA.PyNode,Generic):
68 """The implementation of the PyNode CORBA IDL"""
69 def __init__(self, nodeName,code,poa,my_container):
70 """Initialize the node : compilation in the local context"""
71 Generic.__init__(self,poa)
72 self.nodeName=nodeName
74 self.my_container=my_container._container
75 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
76 ccode=compile(code,nodeName,'exec')
78 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
79 exec(ccode, self.context)
81 def getContainer(self):
82 return self.my_container
90 def defineNewCustomVar(self,varName,valueOfVar):
91 self.context[varName] = pickle.loads(valueOfVar)
94 def executeAnotherPieceOfCode(self,code):
95 """Called for initialization of container lodging self."""
97 ccode=compile(code,self.nodeName,'exec')
98 exec(ccode, self.context)
100 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
102 def execute(self,funcName,argsin):
103 """Execute the function funcName found in local context with pickled args (argsin)"""
105 argsin,kws=pickle.loads(argsin)
106 func=self.context[funcName]
107 argsout=func(*argsin,**kws)
108 argsout=pickle.dumps(argsout,-1)
111 exc_typ,exc_val,exc_fr=sys.exc_info()
112 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
113 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
115 class SenderByte_i(SALOME__POA.SenderByte,Generic):
116 def __init__(self,poa,bytesToSend):
117 Generic.__init__(self,poa)
118 self.bytesToSend = bytesToSend
121 return len(self.bytesToSend)
123 def sendPart(self,n1,n2):
124 return self.bytesToSend[n1:n2]
126 SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR"
128 SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES"
131 SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000
133 DicoForProxyFile = { }
135 def GetSizeOfBufferedReader(f):
137 This method returns in bytes size of a file openned.
141 f (io.IOBase): buffered reader returned by open
149 f.seek(0,io.SEEK_END)
151 f.seek(pos,io.SEEK_SET)
154 def GetObjectFromFile(fname, visitor = None):
155 with open(fname,"rb") as f:
157 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
158 visitor.setFileName( fname )
162 def DumpInFile(obj,fname):
163 with open(fname,"wb") as f:
166 def IncrRefInFile(fname):
167 if fname in DicoForProxyFile:
168 DicoForProxyFile[fname] += 1
170 DicoForProxyFile[fname] = 2
173 def DecrRefInFile(fname):
174 if fname not in DicoForProxyFile:
177 cnt = DicoForProxyFile[fname]
178 DicoForProxyFile[fname] -= 1
180 del DicoForProxyFile[fname]
182 if os.path.exists(fname):
186 def GetBigObjectOnDiskThreshold():
188 if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ:
189 return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] )
191 return SALOME_BIG_OBJ_ON_DISK_THRES_DFT
193 def ActivateProxyMecanismOrNot( sizeInByte ):
194 thres = GetBigObjectOnDiskThreshold()
198 return sizeInByte > thres
200 def GetBigObjectDirectory():
202 if SALOME_FILE_BIG_OBJ_DIR not in os.environ:
203 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
204 return os.path.expanduser( os.path.expandvars( os.environ[SALOME_FILE_BIG_OBJ_DIR] ) )
206 def GetBigObjectFileName():
208 Return a filename in the most secure manner (see tempfile documentation)
211 with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
215 class BigObjectOnDiskBase:
216 def __init__(self, fileName, objSerialized):
218 :param fileName: the file used to dump into.
219 :param objSerialized: the object in pickeled form
220 :type objSerialized: bytes
222 self._filename = fileName
223 # attribute _destroy is here to tell client side or server side
224 # only client side can be with _destroy set to True. server side due to risk of concurrency
225 # so pickled form of self must be done with this attribute set to False.
226 self._destroy = False
227 self.__dumpIntoFile(objSerialized)
229 def getDestroyStatus(self):
234 IncrRefInFile( self._filename )
236 # should never happen !
237 RuntimeError("Invalid call to incrRef !")
241 DecrRefInFile( self._filename )
243 # should never happen !
244 RuntimeError("Invalid call to decrRef !")
246 def unlinkOnDestructor(self):
249 def doNotTouchFile(self):
251 Method called slave side. The life cycle management of file is client side not slave side.
253 self._destroy = False
257 DecrRefInFile( self._filename )
259 def getFileName(self):
260 return self._filename
262 def __dumpIntoFile(self, objSerialized):
263 DumpInFile( objSerialized, self._filename )
265 def get(self, visitor = None):
266 obj = GetObjectFromFile( self._filename, visitor )
270 return float( self.get() )
273 return int( self.get() )
277 if isinstance(obj,str):
280 raise RuntimeError("Not a string")
282 class BigObjectOnDisk(BigObjectOnDiskBase):
283 def __init__(self, fileName, objSerialized):
284 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
286 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
287 def __init__(self, pos, length, fileName):
288 self._filename = fileName
289 self._destroy = False
291 self._length = length
293 def get(self, visitor = None):
294 fullObj = BigObjectOnDiskBase.get(self, visitor)
295 return fullObj[ self._pos ]
297 def __getitem__(self, i):
301 return len(self.get())
303 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
304 def __init__(self, length, fileName, objSerialized):
305 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
306 self._length = length
308 def __getitem__(self, i):
309 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
314 class BigObjectOnDiskList(BigObjectOnDiskSequence):
315 def __init__(self, length, fileName, objSerialized):
316 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
318 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
319 def __init__(self, length, fileName, objSerialized):
320 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
322 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
324 This method return a proxy instance of pickled form of object given in input.
328 obj (pickelable type) : object to be proxified
329 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
333 BigObjectOnDiskBase: proxy instance
335 pickleObj = pickleObjInit
336 if pickleObj is None:
337 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
338 fileName = GetBigObjectFileName()
340 visitor.setHDDMem( len(pickleObj) )
341 visitor.setFileName(fileName)
342 if isinstance( obj, list):
343 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
344 elif isinstance( obj, tuple):
345 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
347 proxyObj = BigObjectOnDisk( fileName , pickleObj )
350 def SpoolPickleObject( obj, visitor = None ):
352 with InOutputObjVisitorCM(visitor) as v:
353 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
354 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
357 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
358 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
361 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
363 def UnProxyObjectSimple( obj, visitor = None ):
365 Method to be called in Remote mode. Alterate the obj _status attribute.
366 Because the slave process does not participate in the reference counting
370 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
373 with InOutputObjVisitorCM(visitor) as v:
374 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
375 if isinstance(obj,BigObjectOnDiskBase):
378 elif isinstance( obj, list):
381 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
386 def UnProxyObjectSimpleLocal( obj ):
388 Method to be called in Local mode. Do not alterate the PyObj counter
390 if isinstance(obj,BigObjectOnDiskBase):
392 elif isinstance( obj, list):
395 retObj.append( UnProxyObjectSimpleLocal(elt) )
401 def __init__(self, fileName):
402 self._filename = fileName
405 return self._filename
407 class FileDeleter(FileHolder):
408 def __init__(self, fileName):
409 super().__init__( fileName )
412 if os.path.exists( self._filename ):
413 os.unlink( self._filename )
415 class MonitoringInfo:
416 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
417 self._py_file_name = pyFileName
418 self._interval_in_ms = intervalInMs
419 self._out_file_name = outFileName
423 def pyFileName(self):
424 return self._py_file_name
431 def pid(self, value):
435 def outFileName(self):
436 return self._out_file_name
439 def intervalInMs(self):
440 return self._interval_in_ms
442 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
444 This method loops indefinitely every intervalInMs milliseconds to scan
445 number of inodes and size of content recursively included into the in input directory.
450 outFileName (str) : name of file inside the results will be written. If None a new file is generated
452 See also CPUMemoryMonitoring
456 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
460 # outFileNameSave stores the content of outFileName during phase of dumping
461 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
462 outFileNameSave = f.name
463 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
465 tempOutFile = outFileName
466 if tempOutFile is None:
467 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
468 with open(tempPyFile,"w") as f:
470 import subprocess as sp
475 with open("{tempOutFile}","a") as f:
476 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
477 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
481 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
486 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
487 szOfDirStr = re.split("[\s]+",st)[0]
490 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
491 f.write( "{{}}\\n".format( str( nbinodes ) ) )
492 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
494 time.sleep( {intervalInMs} / 1000.0 )
495 """.format( **locals()))
496 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
497 pyFileName = FileDeleter( tempPyFile )
498 if outFileName is None:
499 outFileName = FileDeleter( tempOutFile )
501 outFileName = FileHolder(outFileName)
502 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
504 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
506 Launch a subprocess monitoring self process.
507 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
508 CPU usage and RSS memory of the calling process.
509 Communication between subprocess and self is done by file.
513 outFileName (str) : name of file inside the results will be written. If None a new file is generated
515 See also FileSystemMonitoring
518 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
521 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
523 tempOutFile = outFileName
524 if tempOutFile is None:
525 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
527 with open(tempPyFile,"w") as f:
528 f.write("""import psutil
530 process = psutil.Process( pid )
531 def getChargeOf( p ):
532 a,b = p.cpu_percent(), p.memory_info().rss
534 for c in p.children():
535 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
540 with open("{}","a") as f:
541 f.write( "{{}}\\n".format( "{}" ) )
543 cpu,mem_rss = getChargeOf( process )
544 f.write( "{{}}\\n".format( str( cpu ) ) )
545 f.write( "{{}}\\n".format( str( mem_rss ) ) )
547 time.sleep( {} / 1000.0 )
548 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
549 if outFileName is None:
550 autoOutFile = FileDeleter(tempOutFile)
552 autoOutFile = FileHolder(tempOutFile)
553 return FileDeleter(tempPyFile),autoOutFile
554 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
555 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
557 class GenericPythonMonitoringLauncherCtxMgr:
558 def __init__(self, monitoringParams):
562 monitoringParams (MonitoringInfo)
564 self._monitoring_params = monitoringParams
567 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
568 self._monitoring_params.pid = pid
569 return self._monitoring_params
571 def __exit__(self,exctype, exc, tb):
572 StopMonitoring( self._monitoring_params )
574 def StopMonitoring( monitoringInfo ):
576 Kill monitoring subprocess.
580 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
583 KernelBasis.StopMonitoring(monitoringInfo.pid)
586 def __init__(self, intervalInMs, cpu, mem_rss):
591 cpu (list<float>) CPU usage
592 mem_rss (list<int>) rss memory usage
594 self._interval_in_ms = intervalInMs
595 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
597 st = """Interval in ms : {self.intervalInMs}
599 """.format( **locals() )
602 def intervalInMs(self):
603 return self._interval_in_ms
607 list of triplets. First param of pair is cpu usage
608 Second param of pair is memory usage
612 def ReadCPUMemInfoInternal( fileName ):
614 cpu = [] ; mem_rss = []
615 if os.path.exists( fileName ):
617 with open(fileName, "r") as f:
618 coarseData = [ elt.strip() for elt in f.readlines() ]
619 intervalInMs = int( coarseData[0] )
620 coarseData = coarseData[1:]
621 cpu = [float(elt) for elt in coarseData[::2]]
622 mem_rss = [ int(elt) for elt in coarseData[1::2]]
625 return CPUMemInfo(intervalInMs,cpu,mem_rss)
627 def ReadCPUMemInfo( monitoringInfo ):
629 Retrieve CPU/Mem data of monitoring.
633 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
639 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
642 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
646 timeStamps (list<datetimestruct>)
648 volumeOfDir (list<str>)
650 self._dir_name_monitored = dirNameMonitored
651 self._interval_in_ms = intervalInMs
652 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
654 st = """Filename monitored : {self.dirNameMonitored}
655 Interval in ms : ${self.intervalInMs}
657 """.format( **locals() )
660 def dirNameMonitored(self):
661 return self._dir_name_monitored
663 def intervalInMs(self):
664 return self._interval_in_ms
668 list of triplets. First param of triplet is datetimestruct
669 Second param of triplet is #inodes.
670 Thirst param of triplet is size.
674 def ReadInodeSizeInfoInternal( fileName ):
677 with open(fileName, "r") as f:
678 coarseData = [ elt.strip() for elt in f.readlines() ]
679 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
680 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
681 nbInodes = [int(elt) for elt in coarseData[1::3]]
682 volumeOfDir = coarseData[2::3]
683 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
685 def ReadInodeSizeInfo( monitoringInfo ):
687 Retrieve nb of inodes and size of monitoring
691 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
697 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
699 class SeqByteReceiver:
700 # 2GB limit to trigger split into chunks
701 CHUNK_SIZE = 2000000000
702 def __init__(self,sender):
705 self._obj.UnRegister()
708 size = self._obj.getSize()
709 if size <= SeqByteReceiver.CHUNK_SIZE:
710 return self.fetchOneShot( size )
712 return self.fetchByChunks( size )
713 def fetchOneShot(self,size):
714 return self._obj.sendPart(0,size)
715 def fetchByChunks(self,size):
717 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
719 data_for_split_case = bytes(0)
720 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
721 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
722 while iStart!=iEnd and iEnd <= size:
723 part = self._obj.sendPart(iStart,iEnd)
724 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
725 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
726 return data_for_split_case
728 FinalCode = """import pickle
729 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
732 orb = CORBA.ORB_init([''])
735 outputFileName = "{}"
737 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
738 with open(inputFileName,"rb") as f:
739 context = pickle.load( f )
740 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
741 with open(codeFileName,"r") as f:
744 exec( code , context )
745 # filter part of context to be exported to father process
746 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
748 with open(outputFileName,"wb") as f:
749 pickle.dump( context, f )
752 class PythonFunctionEvaluatorParams:
753 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
754 self._main_filename = mainFileName
755 self._code_filename = codeFileName
756 self._in_context_filename = inContextFileName
757 self._out_context_filename = outContextFileName
761 with open(self._out_context_filename,"rb") as f:
762 return pickle.load( f )
763 def destroyOnOK(self):
764 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
765 if os.path.exists( fileToDestroy ):
766 os.unlink( fileToDestroy )
767 def destroyOnKO(self, containerRef):
769 Called in the context of failure with replay mode activated
771 for fileToDestroy in [self._out_context_filename]:
772 if os.path.exists( fileToDestroy ):
773 os.unlink( fileToDestroy )
774 # register to container files group associated to the
775 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
778 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
781 def cleanOperations(self):
783 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
785 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
787 return f"return with non zero code ({returnCode})"
790 if keepFilesToReplay:
791 return f"""return with non zero code ({returnCode})
793 Looks like a hard crash as returnCode {returnCode} != 0
795 {self.cleanOperations}
799 return f"""return with non zero code ({returnCode})
801 Looks like a hard crash as returnCode {returnCode} != 0
805 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
807 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
812 code (str) : python code to be executed using context
813 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
814 outargsname (list<str>) : list of arguments to be exported
815 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
816 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
817 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
822 ScriptExecInfo : instance serverside
827 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
831 import subprocess as sp
834 def InternalExecResistant( code, context, outargsname):
835 orb = CORBA.ORB_init([''])
836 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
838 EXEC_CODE_FNAME_PXF = "execsafe_"
839 def RetrieveUniquePartFromPfx( fname ):
840 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
841 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
844 codeFileName = os.path.basename( codeFd.name )
845 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
846 with open(contextFileName,"wb") as contextFd:
847 pickle.dump( context, contextFd)
848 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
849 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
850 with open(mainExecFileName,"w") as f:
851 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
852 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
853 stdout, stderr = p.communicate()
854 returnCode = p.returncode
855 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
856 ret = instanceOfLogOfCurrentSession._current_instance
857 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
858 stdout = stdout.decode()
859 stderr = stderr.decode()
860 sys.stdout.write( stdout ) ; sys.stdout.flush()
861 sys.stderr.write( stderr ) ; sys.stderr.flush()
863 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
864 if len(pcklData) > 0:
865 ret = pickle.loads( pcklData )
866 context.update( evParams.result )
867 evParams.destroyOnOK()
870 if keepFilesToReplay:
871 evParams.destroyOnKO( containerRef )
873 evParams.destroyOnOK()
874 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
876 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
877 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
879 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
880 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
882 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
883 exec( code, context )
884 return instanceOfLogOfCurrentSession._current_instance
886 class LogOfCurrentExecutionSessionAbs(abc.ABC):
888 self._current_instance = ScriptExecInfo()
890 def addInfoOnLevel2(self, key, value):
891 setattr(self._current_instance,key,value)
894 def addFreestyleAndFlush(self, value):
895 raise RuntimeError("Must be overloaded")
897 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
898 def __init__(self, handleToCentralizedInst):
900 self._remote_handle = handleToCentralizedInst
902 def addFreestyleAndFlush(self, value):
903 self._current_instance.freestyle = value
904 self.finalizeAndPushToMaster()
906 def finalizeAndPushToMaster(self):
907 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
909 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
911 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
913 def __init__(self, handleToCentralizedInst = None):
915 def addFreestyleAndFlush(self, value):
918 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
919 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
920 def __init__(self, nodeName, code, poa, my_container, logscript):
921 """Initialize the node : compilation in the local context"""
922 Generic.__init__(self,poa)
923 self.nodeName=nodeName
925 self.my_container_py = my_container
926 self.my_container=my_container._container
927 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
928 self.ccode=compile(code,nodeName,'exec')
930 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
931 self._log_script = logscript
932 self._current_execution_session = None
933 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
936 def executeNow(self, outargsname):
937 raise RuntimeError("Must be overloaded")
940 # force removal of self.context. Don t know why it s not done by default
941 self.removeAllVarsInContext()
944 def getContainer(self):
945 return self.my_container
953 def defineNewCustomVar(self,varName,valueOfVar):
954 self.context[varName] = pickle.loads(valueOfVar)
957 def executeAnotherPieceOfCode(self,code):
958 """Called for initialization of container lodging self."""
960 ccode=compile(code,self.nodeName,'exec')
961 exec(ccode, self.context)
963 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
965 def assignNewCompiledCode(self,codeStr):
968 self.ccode=compile(codeStr,self.nodeName,'exec')
970 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
972 def executeSimple(self, key, val):
974 Same as execute method except that no pickelization mecanism is implied here. No output is expected
977 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
978 exec(self.ccode,self.context)
980 exc_typ,exc_val,exc_fr=sys.exc_info()
981 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
982 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
983 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
985 def execute(self,outargsname,argsin):
986 """Execute the script stored in attribute ccode with pickled args (argsin)"""
988 argsname,kws=pickle.loads(argsin)
989 self.context.update(kws)
990 exec(self.ccode, self.context)
992 for arg in outargsname:
993 if arg not in self.context:
994 raise KeyError("There is no variable %s in context" % arg)
995 argsout.append(self.context[arg])
996 argsout=pickle.dumps(tuple(argsout),-1)
999 exc_typ,exc_val,exc_fr=sys.exc_info()
1000 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1001 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1002 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1004 def executeFirst(self,argsin):
1005 """ Same than first part of self.execute to reduce memory peak."""
1006 def ArgInMananger(self,argsin):
1007 argsInPy = SeqByteReceiver( argsin )
1008 data = argsInPy.data()
1009 self.addInfoOnLevel2("inputMem",len(data))
1010 _,kws=pickle.loads(data)
1013 self.beginOfCurrentExecutionSession()
1014 self.addTimeInfoOnLevel2("startInputTime")
1015 # to force call of SeqByteReceiver's destructor
1016 kws = ArgInMananger(self,argsin)
1017 vis = InOutputObjVisitor()
1019 # fetch real data if necessary
1020 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1021 self.addInfoOnLevel2("inputHDDMem",vis)
1022 self.context.update(kws)
1023 self.addTimeInfoOnLevel2("endInputTime")
1025 exc_typ,exc_val,exc_fr=sys.exc_info()
1026 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1027 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1028 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1030 def executeSecond(self,outargsname):
1031 """ Same than second part of self.execute to reduce memory peak."""
1034 self.addTimeInfoOnLevel2("startExecTime")
1036 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1037 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
1038 self._current_execution_session._current_instance = self.executeNow( outargsname )
1039 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1041 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1042 del monitoringParams
1043 self.addTimeInfoOnLevel2("endExecTime")
1044 self.addTimeInfoOnLevel2("startOutputTime")
1046 for arg in outargsname:
1047 if arg not in self.context:
1048 raise KeyError("There is no variable %s in context" % arg)
1049 argsout.append(self.context[arg])
1052 vis = InOutputObjVisitor()
1054 # the proxy mecanism is catched here
1055 argPickle = SpoolPickleObject( arg, vis )
1056 retArg = SenderByte_i( self.poa,argPickle )
1057 id_o = self.poa.activate_object(retArg)
1058 retObj = self.poa.id_to_reference(id_o)
1059 ret.append( retObj._narrow( SALOME.SenderByte ) )
1060 outputMem += len(argPickle)
1061 self.addInfoOnLevel2("outputMem",outputMem)
1062 self.addInfoOnLevel2("outputHDDMem",vis)
1063 self.addTimeInfoOnLevel2("endOutputTime")
1064 self.endOfCurrentExecutionSession()
1067 exc_typ,exc_val,exc_fr=sys.exc_info()
1068 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1069 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1070 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1072 def listAllVarsInContext(self):
1074 pat = re.compile("^__([a-z]+)__$")
1075 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1077 def removeAllVarsInContext(self):
1078 for elt in self.listAllVarsInContext():
1079 del self.context[elt]
1081 def getValueOfVarInContext(self,varName):
1083 return pickle.dumps(self.context[varName],-1)
1085 exc_typ,exc_val,exc_fr=sys.exc_info()
1086 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1087 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1090 def assignVarInContext(self, varName, value):
1092 self.context[varName][0] = pickle.loads(value)
1094 exc_typ,exc_val,exc_fr=sys.exc_info()
1095 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1096 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1099 def callMethodOnVarInContext(self, varName, methodName, args):
1101 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1103 exc_typ,exc_val,exc_fr=sys.exc_info()
1104 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1105 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1108 def beginOfCurrentExecutionSession(self):
1109 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1110 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1112 def endOfCurrentExecutionSession(self):
1113 self._current_execution_session.finalizeAndPushToMaster()
1114 self._current_execution_session = None
1116 def addInfoOnLevel2(self, key, value):
1117 self._current_execution_session.addInfoOnLevel2(key, value)
1119 def addTimeInfoOnLevel2(self, key):
1120 from datetime import datetime
1121 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1123 class PyScriptNode_i(PyScriptNode_Abstract_i):
1124 def __init__(self, nodeName, code, poa, my_container, logscript):
1125 super().__init__(nodeName, code, poa, my_container, logscript)
1127 def executeNow(self, outargsname):
1128 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1130 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1131 def __init__(self, nodeName, code, poa, my_container, logscript):
1132 super().__init__(nodeName, code, poa, my_container, logscript)
1134 def executeNow(self, outargsname):
1135 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1137 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1138 def __init__(self, nodeName, code, poa, my_container, logscript):
1139 super().__init__(nodeName, code, poa, my_container, logscript)
1141 def executeNow(self, outargsname):
1142 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)