1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
33 from pathlib import Path
39 from SALOME_ContainerHelper import ScriptExecInfo
41 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
43 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
45 MY_KEY_TO_DETECT_FINISH = "neib av tuot"
47 class Generic(SALOME__POA.GenericObj):
48 """A Python implementation of the GenericObj CORBA IDL"""
49 def __init__(self,poa):
54 #print("Register called : %d"%self.cnt)
58 #print("UnRegister called : %d"%self.cnt)
61 oid=self.poa.servant_to_id(self)
62 self.poa.deactivate_object(oid)
65 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
69 #print("Destuctor called")
72 class PyNode_i (Engines__POA.PyNode,Generic):
73 """The implementation of the PyNode CORBA IDL"""
74 def __init__(self, nodeName,code,poa,my_container):
75 """Initialize the node : compilation in the local context"""
76 Generic.__init__(self,poa)
77 self.nodeName=nodeName
79 self.my_container=my_container._container
80 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
81 ccode=compile(code,nodeName,'exec')
83 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
84 exec(ccode, self.context)
86 def getContainer(self):
87 return self.my_container
95 def defineNewCustomVar(self,varName,valueOfVar):
96 self.context[varName] = pickle.loads(valueOfVar)
99 def executeAnotherPieceOfCode(self,code):
100 """Called for initialization of container lodging self."""
102 ccode=compile(code,self.nodeName,'exec')
103 exec(ccode, self.context)
105 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
107 def execute(self,funcName,argsin):
108 """Execute the function funcName found in local context with pickled args (argsin)"""
110 argsin,kws=pickle.loads(argsin)
111 func=self.context[funcName]
112 argsout=func(*argsin,**kws)
113 argsout=pickle.dumps(argsout,-1)
116 exc_typ,exc_val,exc_fr=sys.exc_info()
117 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
118 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
120 class SenderByte_i(SALOME__POA.SenderByte,Generic):
121 def __init__(self,poa,bytesToSend):
122 Generic.__init__(self,poa)
123 self.bytesToSend = bytesToSend
126 return len(self.bytesToSend)
128 def sendPart(self,n1,n2):
129 return self.bytesToSend[n1:n2]
131 def IsRemote(hostName):
133 return socket.gethostname() != hostName
135 def RemoveFileSafe( fileName ):
136 if os.path.exists( fileName ):
137 os.unlink( fileName )
139 def RetrieveRemoteFileLocallyInSameFileName( remoteHostName, fileName):
141 dn = os.path.dirname( fileName )
142 import subprocess as sp
143 p = sp.Popen(["scp","{}:{}".format(remoteHostName,fileName),dn])
146 def DestroyRemotely( remoteHostName, fileName):
147 import subprocess as sp
148 p = sp.Popen(["ssh","-qC","-oStrictHostKeyChecking=no","-oBatchMode=yes",remoteHostName, f"rm {fileName}"])
151 class CopyFileFromRemoteCtxMgr:
152 def __init__(self, hostName, fileName):
153 self._remoteHostName = hostName
154 self._fileName = Path(fileName)
155 self._isRemote = IsRemote( hostName )
158 if not self._isRemote:
160 dn = self._fileName.parent
161 logging.debug(f"[SALOME_PyNode] Creating directory {dn}")
162 dn.mkdir(parents=True, exist_ok=True)
163 RetrieveRemoteFileLocallyInSameFileName(self._remoteHostName,f"{self._fileName}")
165 def __exit__(self, exctype, exc, tb):
166 if not self._isRemote:
168 self._fileName.unlink()
170 class BigFileOnDiskBase(abc.ABC):
172 Base class in charge of managing
173 Copy or share of file accross computation Nodes
175 def __init__(self, fileName):
176 self._file_name = fileName
178 def getFileName(self):
179 return self._file_name
182 def get(self, visitor = None):
184 Method called client side of data.
186 raise NotImplementedError
191 Method called client side of data.
193 raise NotImplementedError
196 class BigFileOnDiskShare(BigFileOnDiskBase):
198 def get(self, visitor = None):
199 return GetObjectFromFile( self._file_name, visitor )
202 RemoveFileSafe( self._file_name )
204 class BigFileOnDiskSSDNoShare(BigFileOnDiskBase):
205 def __init__(self, fileName):
207 super().__init__( fileName )
208 # hostname hosting data
209 self._hostname = socket.gethostname()
211 def get(self, visitor = None):
212 logging.debug(f"[SALOME_PyNode] Directory {Path(self._file_name).parent} should be created")
213 with CopyFileFromRemoteCtxMgr(self._hostname, self._file_name):
214 return GetObjectFromFile( self._file_name, visitor )
217 if IsRemote( self._hostname ):
218 DestroyRemotely(self._hostname,self._file_name)
220 RemoveFileSafe( self._file_name )
222 BigFileOnDiskClsFromProtocol = { 0 : BigFileOnDiskShare, 1 : BigFileOnDiskSSDNoShare }
224 DicoForProxyFile = { }
226 def GetSizeOfBufferedReader(f):
228 This method returns in bytes size of a file openned.
232 f (io.IOBase): buffered reader returned by open
240 f.seek(0,io.SEEK_END)
242 f.seek(pos,io.SEEK_SET)
245 def GetObjectFromFile(fname, visitor = None):
246 with open(fname,"rb") as f:
248 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
249 visitor.setFileName( fname )
253 def DumpInFile(obj,fname):
254 with open(fname,"wb") as f:
257 def IncrRefInFile(fname):
262 fnameEff = fname.getFileName()
263 if fnameEff in DicoForProxyFile:
264 DicoForProxyFile[fnameEff] += 1
266 DicoForProxyFile[fnameEff] = 2
269 def DecrRefInFile(fname):
272 :type fname: BigFileOnDiskBase
274 if fname.getFileName() not in DicoForProxyFile:
277 cnt = DicoForProxyFile[fname.getFileName()]
278 DicoForProxyFile[fname.getFileName()] -= 1
280 del DicoForProxyFile[fname.getFileName()]
285 def GetBigObjectOnDiskThreshold():
286 return KernelBasis.GetBigObjOnDiskThreshold()
288 def ActivateProxyMecanismOrNot( sizeInByte ):
289 thres = GetBigObjectOnDiskThreshold()
293 return sizeInByte > thres
295 class BigObjectDirHandler(abc.ABC):
296 def __init__(self, directory):
297 self._directory = Path(directory)
301 return self._directory
306 def __exit__(self, exctype, exc, tb):
309 class BigObjectDirHandlerOnDiskShare(BigObjectDirHandler):
312 class BigObjectDirHandlerOnDiskSSDNoShare(BigObjectDirHandler):
315 logging.debug(f"[SALOME_PyNode] Creating directory {self._directory}")
316 self._directory.mkdir(parents=True, exist_ok=True)
319 def __exit__(self, exctype, exc, tb):
322 BigObjectDirHandlerFromProtocol = { 0 : BigObjectDirHandlerOnDiskShare, 1 : BigObjectDirHandlerOnDiskSSDNoShare }
324 def GetBigObjectDirectory():
326 protocol, directory = KernelBasis.GetBigObjOnDiskProtocolAndDirectory()
328 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
329 return protocol, os.path.expanduser( os.path.expandvars( directory ) )
331 def GetBigObjectFileName():
333 Return a filename in the most secure manner (see tempfile documentation)
336 protocol, directory = GetBigObjectDirectory()
337 with BigObjectDirHandlerFromProtocol[protocol](directory) as handler:
338 with tempfile.NamedTemporaryFile(dir = handler.directory, prefix="mem_", suffix=".pckl") as f:
340 return BigFileOnDiskClsFromProtocol[protocol]( ret )
342 class BigObjectOnDiskBase:
343 def __init__(self, fileName, objSerialized):
345 :param fileName: the file used to dump into.
346 :param objSerialized: the object in pickeled form
347 :type objSerialized: bytes
349 self._filename = fileName
350 # attribute _destroy is here to tell client side or server side
351 # only client side can be with _destroy set to True. server side due to risk of concurrency
352 # so pickled form of self must be done with this attribute set to False.
353 self._destroy = False
354 self.__dumpIntoFile(objSerialized)
356 def getDestroyStatus(self):
361 IncrRefInFile( self._filename )
363 # should never happen !
364 RuntimeError("Invalid call to incrRef !")
368 DecrRefInFile( self._filename )
370 # should never happen !
371 RuntimeError("Invalid call to decrRef !")
373 def unlinkOnDestructor(self):
376 def doNotTouchFile(self):
378 Method called slave side. The life cycle management of file is client side not slave side.
380 self._destroy = False
384 DecrRefInFile( self._filename )
386 def getFileName(self):
387 return self._filename
389 def __dumpIntoFile(self, objSerialized):
390 DumpInFile( objSerialized, self._filename.getFileName() )
392 def get(self, visitor = None):
393 return self._filename.get(visitor)
396 return float( self.get() )
399 return int( self.get() )
403 if isinstance(obj,str):
406 raise RuntimeError("Not a string")
408 class BigObjectOnDisk(BigObjectOnDiskBase):
409 def __init__(self, fileName, objSerialized):
410 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
412 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
413 def __init__(self, pos, length, fileName):
414 self._filename = fileName
415 self._destroy = False
417 self._length = length
419 def get(self, visitor = None):
420 fullObj = BigObjectOnDiskBase.get(self, visitor)
421 return fullObj[ self._pos ]
423 def __getitem__(self, i):
427 return len(self.get())
429 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
430 def __init__(self, length, fileName, objSerialized):
431 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
432 self._length = length
434 def __getitem__(self, i):
435 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
440 class BigObjectOnDiskList(BigObjectOnDiskSequence):
441 def __init__(self, length, fileName, objSerialized):
442 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
444 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
445 def __init__(self, length, fileName, objSerialized):
446 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
448 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
450 This method return a proxy instance of pickled form of object given in input.
454 obj (pickelable type) : object to be proxified
455 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
459 BigObjectOnDiskBase: proxy instance
461 pickleObj = pickleObjInit
462 if pickleObj is None:
463 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
464 fileName = GetBigObjectFileName()
466 visitor.setHDDMem( len(pickleObj) )
467 visitor.setFileName( fileName.getFileName() )
468 if isinstance( obj, list):
469 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
470 elif isinstance( obj, tuple):
471 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
473 proxyObj = BigObjectOnDisk( fileName , pickleObj )
476 def SpoolPickleObject( obj, visitor = None ):
478 with InOutputObjVisitorCM(visitor) as v:
479 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
480 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
483 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
484 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
487 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
489 def UnProxyObjectSimple( obj, visitor = None ):
491 Method to be called in Remote mode. Alterate the obj _status attribute.
492 Because the slave process does not participate in the reference counting
496 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
499 with InOutputObjVisitorCM(visitor) as v:
500 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
501 if isinstance(obj,BigObjectOnDiskBase):
504 elif isinstance( obj, list):
507 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
512 def UnProxyObjectSimpleLocal( obj ):
514 Method to be called in Local mode. Do not alterate the PyObj counter
516 if isinstance(obj,BigObjectOnDiskBase):
518 elif isinstance( obj, list):
521 retObj.append( UnProxyObjectSimpleLocal(elt) )
527 def __init__(self, fileName):
528 self._filename = fileName
531 return self._filename
533 class FileDeleter(FileHolder):
534 def __init__(self, fileName):
535 super().__init__( fileName )
538 if os.path.exists( self._filename ):
539 os.unlink( self._filename )
541 class MonitoringInfo:
542 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
543 self._py_file_name = pyFileName
544 self._interval_in_ms = intervalInMs
545 self._out_file_name = outFileName
549 def pyFileName(self):
550 return self._py_file_name
557 def pid(self, value):
561 def outFileName(self):
562 return self._out_file_name
565 def intervalInMs(self):
566 return self._interval_in_ms
568 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
570 This method loops indefinitely every intervalInMs milliseconds to scan
571 number of inodes and size of content recursively included into the in input directory.
576 outFileName (str) : name of file inside the results will be written. If None a new file is generated
578 See also CPUMemoryMonitoring
582 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
585 # outFileNameSave stores the content of outFileName during phase of dumping
586 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
587 outFileNameSave = f.name
588 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
590 tempOutFile = outFileName
591 if tempOutFile is None:
592 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
593 with open(tempPyFile,"w") as f:
595 import subprocess as sp
600 with open("{tempOutFile}","a") as f:
601 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
602 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
606 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
611 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
612 szOfDirStr = re.split("[\s]+",st)[0]
615 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
616 f.write( "{{}}\\n".format( str( nbinodes ) ) )
617 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
619 time.sleep( {intervalInMs} / 1000.0 )
620 """.format( **locals()))
621 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
622 pyFileName = FileDeleter( tempPyFile )
623 if outFileName is None:
624 outFileName = FileDeleter( tempOutFile )
626 outFileName = FileHolder(outFileName)
627 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
629 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
631 Launch a subprocess monitoring self process.
632 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
633 CPU usage and RSS memory of the calling process.
634 Communication between subprocess and self is done by file.
638 outFileName (str) : name of file inside the results will be written. If None a new file is generated
640 See also FileSystemMonitoring
643 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
646 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
648 tempOutFile = outFileName
649 if tempOutFile is None:
650 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
652 with open(tempPyFile,"w") as f:
653 f.write("""import psutil
655 process = psutil.Process( pid )
656 def getChargeOf( p ):
657 a,b = p.cpu_percent(), p.memory_info().rss
659 for c in p.children():
660 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
665 with open("{}","a") as f:
666 f.write( "{{}}\\n".format( "{}" ) )
668 cpu,mem_rss = getChargeOf( process )
669 f.write( "{{}}\\n".format( str( cpu ) ) )
670 f.write( "{{}}\\n".format( str( mem_rss ) ) )
672 time.sleep( {} / 1000.0 )
673 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
674 if outFileName is None:
675 autoOutFile = FileDeleter(tempOutFile)
677 autoOutFile = FileHolder(tempOutFile)
678 return FileDeleter(tempPyFile),autoOutFile
679 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
680 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
682 class GenericPythonMonitoringLauncherCtxMgr:
683 def __init__(self, monitoringParams):
687 monitoringParams (MonitoringInfo)
689 self._monitoring_params = monitoringParams
692 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
693 self._monitoring_params.pid = pid
694 return self._monitoring_params
696 def __exit__(self,exctype, exc, tb):
697 StopMonitoring( self._monitoring_params )
698 del self._monitoring_params
700 gc.collect() # force destruction of objects even in raise context
702 def StopMonitoring( monitoringInfo ):
704 Kill monitoring subprocess.
708 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
711 KernelBasis.StopMonitoring(monitoringInfo.pid)
714 def __init__(self, intervalInMs, cpu, mem_rss):
719 cpu (list<float>) CPU usage
720 mem_rss (list<int>) rss memory usage
722 self._interval_in_ms = intervalInMs
723 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
725 st = """Interval in ms : {self.intervalInMs}
727 """.format( **locals() )
730 def intervalInMs(self):
731 return self._interval_in_ms
735 list of triplets. First param of pair is cpu usage
736 Second param of pair is memory usage
740 def ReadCPUMemInfoInternal( fileName ):
742 cpu = [] ; mem_rss = []
743 if os.path.exists( fileName ):
745 with open(fileName, "r") as f:
746 coarseData = [ elt.strip() for elt in f.readlines() ]
747 intervalInMs = int( coarseData[0] )
748 coarseData = coarseData[1:]
749 cpu = [float(elt) for elt in coarseData[::2]]
750 mem_rss = [ int(elt) for elt in coarseData[1::2]]
753 return CPUMemInfo(intervalInMs,cpu,mem_rss)
755 def ReadCPUMemInfo( monitoringInfo ):
757 Retrieve CPU/Mem data of monitoring.
761 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
767 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
770 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
774 timeStamps (list<datetimestruct>)
776 volumeOfDir (list<str>)
778 self._dir_name_monitored = dirNameMonitored
779 self._interval_in_ms = intervalInMs
780 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
782 st = """Filename monitored : {self.dirNameMonitored}
783 Interval in ms : ${self.intervalInMs}
785 """.format( **locals() )
788 def dirNameMonitored(self):
789 return self._dir_name_monitored
791 def intervalInMs(self):
792 return self._interval_in_ms
796 list of triplets. First param of triplet is datetimestruct
797 Second param of triplet is #inodes.
798 Thirst param of triplet is size.
802 def ReadInodeSizeInfoInternal( fileName ):
805 with open(fileName, "r") as f:
806 coarseData = [ elt.strip() for elt in f.readlines() ]
807 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
808 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
809 nbInodes = [int(elt) for elt in coarseData[1::3]]
810 volumeOfDir = coarseData[2::3]
811 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
813 def ReadInodeSizeInfo( monitoringInfo ):
815 Retrieve nb of inodes and size of monitoring
819 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
825 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
827 class SeqByteReceiver:
828 # 2GB limit to trigger split into chunks
829 CHUNK_SIZE = 2000000000
830 def __init__(self,sender):
833 self._obj.UnRegister()
836 size = self._obj.getSize()
837 if size <= SeqByteReceiver.CHUNK_SIZE:
838 return self.fetchOneShot( size )
840 return self.fetchByChunks( size )
841 def fetchOneShot(self,size):
842 return self._obj.sendPart(0,size)
843 def fetchByChunks(self,size):
845 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
847 data_for_split_case = bytes(0)
848 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
849 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
850 while iStart!=iEnd and iEnd <= size:
851 part = self._obj.sendPart(iStart,iEnd)
852 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
853 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
854 return data_for_split_case
856 FinalCode = """import pickle
857 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
861 # WorkDir may be important to replay : "{}"
862 orb = CORBA.ORB_init([''])
864 codeFileName = os.path.join( caseDirectory, "{}" )
865 inputFileName = os.path.join( caseDirectory, "{}" )
866 outputFileName = os.path.join( caseDirectory, "{}" )
869 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
870 with open(inputFileName,"rb") as f:
871 context = pickle.load( f )
872 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
873 with open(codeFileName,"r") as f:
876 exec( code , context )
877 # filter part of context to be exported to father process
878 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
880 with open(outputFileName,"wb") as f:
881 pickle.dump( context, f )
884 class PythonFunctionEvaluatorParams:
885 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
886 self._main_filename = mainFileName
887 self._code_filename = codeFileName
888 self._in_context_filename = inContextFileName
889 self._out_context_filename = outContextFileName
893 with open(self._out_context_filename,"rb") as f:
894 return pickle.load( f )
895 def destroyOnOK(self):
896 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
897 if os.path.exists( fileToDestroy ):
898 os.unlink( fileToDestroy )
899 def destroyOnKO(self, containerRef):
901 Called in the context of failure with replay mode activated
903 for fileToDestroy in [self._out_context_filename]:
904 if os.path.exists( fileToDestroy ):
905 os.unlink( fileToDestroy )
906 # register to container files group associated to the
907 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
910 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
913 def cleanOperations(self):
915 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
917 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
919 return f"return with non zero code ({returnCode})"
922 if keepFilesToReplay:
923 return f"""return with non zero code ({returnCode})
925 Looks like a hard crash as returnCode {returnCode} != 0
927 {self.cleanOperations}
931 return f"""return with non zero code ({returnCode})
933 Looks like a hard crash as returnCode {returnCode} != 0
937 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
939 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
944 code (str) : python code to be executed using context
945 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
946 outargsname (list<str>) : list of arguments to be exported
947 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
948 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
949 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
950 closeEyesOnErrorAtExit (bool) : if True in case of crash of subprocess, if MY_KEY_TO_DETECT_FINISH is displayed at the end of stdout
955 ScriptExecInfo : instance serverside
960 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
964 import subprocess as sp
967 def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
968 def StdErrTreatment(closeEyesOnErrorAtExit , stderr):
969 if not closeEyesOnErrorAtExit:
972 return stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
974 return True,StdErrTreatment(closeEyesOnErrorAtExit , stderr)
975 if not closeEyesOnErrorAtExit:
977 if stderr[-len(MY_KEY_TO_DETECT_FINISH):] == MY_KEY_TO_DETECT_FINISH:
978 return True,stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
983 def InternalExecResistant( code, context, outargsname):
985 orb = CORBA.ORB_init([''])
986 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
988 EXEC_CODE_FNAME_PXF = "execsafe_"
989 def RetrieveUniquePartFromPfx( fname ):
990 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
991 dirForReplayFiles = KernelBasis.GetDirectoryForReplayFiles()
992 if not dirForReplayFiles:
993 raise RuntimeError("You are in context of exec resistant you have to position Directory hosting these files properly")
994 with tempfile.NamedTemporaryFile(dir=dirForReplayFiles,prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
995 codeFd.write( "{}\n".format( containerRef.get_startup_code() ) )
997 if closeEyesOnErrorAtExit:
1000 sys.stderr.write({!r})
1001 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
1003 codeFileName = os.path.basename( codeFd.name )
1004 contextFileName = os.path.join( dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
1005 with open(contextFileName,"wb") as contextFd:
1006 pickle.dump( context, contextFd)
1007 resFileName = os.path.join( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
1008 mainExecFileName = os.path.join( dirForReplayFiles, "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
1009 with open(mainExecFileName,"w") as f:
1010 f.write( FinalCode.format( os.getcwd(), dirForReplayFiles, codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
1011 for iTry in range( KernelBasis.GetNumberOfRetry() ):
1013 print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
1014 p = sp.Popen(["python3", mainExecFileName],cwd = os.getcwd(),stdout = sp.PIPE, stderr = sp.PIPE)
1015 stdout, stderr = p.communicate()
1016 returnCode = p.returncode
1019 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
1020 ret = instanceOfLogOfCurrentSession._current_instance
1021 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
1022 stdout = stdout.decode()
1023 stderr = stderr.decode()
1024 sys.stdout.write( stdout ) ; sys.stdout.flush()
1025 isOK, stderr = IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr )
1026 sys.stderr.write( stderr ) ; sys.stderr.flush()
1028 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
1029 if len(pcklData) > 0:
1030 ret = pickle.loads( pcklData )
1031 context.update( evParams.result )
1032 evParams.destroyOnOK()
1034 print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
1037 if keepFilesToReplay:
1038 evParams.destroyOnKO( containerRef )
1040 evParams.destroyOnOK()
1041 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
1043 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1044 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
1046 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1047 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
1049 def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1050 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
1052 def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1053 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
1055 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1056 exec( code, context )
1057 return instanceOfLogOfCurrentSession._current_instance
1059 class LogOfCurrentExecutionSessionAbs(abc.ABC):
1061 self._current_instance = ScriptExecInfo()
1063 def addInfoOnLevel2(self, key, value):
1064 setattr(self._current_instance,key,value)
1067 def addFreestyleAndFlush(self, value):
1068 raise RuntimeError("Must be overloaded")
1070 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
1071 def __init__(self, handleToCentralizedInst):
1073 self._remote_handle = handleToCentralizedInst
1075 def addFreestyleAndFlush(self, value):
1076 self._current_instance.freestyle = value
1077 self.finalizeAndPushToMaster()
1079 def finalizeAndPushToMaster(self):
1081 Voluntary do nothing in case of problem to avoid to trouble execution
1084 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
1088 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
1090 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
1092 def __init__(self, handleToCentralizedInst = None):
1094 def addFreestyleAndFlush(self, value):
1097 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
1098 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
1099 def __init__(self, nodeName, code, poa, my_container, logscript):
1100 """Initialize the node : compilation in the local context"""
1101 Generic.__init__(self,poa)
1102 self.nodeName=nodeName
1104 self.my_container_py = my_container
1105 self.my_container=my_container._container
1106 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
1107 self.ccode=compile(code,nodeName,'exec')
1109 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
1110 self._log_script = logscript
1111 self._current_execution_session = None
1112 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
1115 def executeNow(self, outargsname):
1116 raise RuntimeError("Must be overloaded")
1119 # force removal of self.context. Don t know why it s not done by default
1120 self.removeAllVarsInContext()
1123 def getContainer(self):
1124 return self.my_container
1130 return self.nodeName
1132 def defineNewCustomVar(self,varName,valueOfVar):
1133 self.context[varName] = pickle.loads(valueOfVar)
1136 def executeAnotherPieceOfCode(self,code):
1137 """Called for initialization of container lodging self."""
1139 ccode=compile(code,self.nodeName,'exec')
1140 exec(ccode, self.context)
1142 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
1144 def assignNewCompiledCode(self,codeStr):
1147 self.ccode=compile(codeStr,self.nodeName,'exec')
1149 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
1151 def executeSimple(self, key, val):
1153 Same as execute method except that no pickelization mecanism is implied here. No output is expected
1156 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
1157 exec(self.ccode,self.context)
1159 exc_typ,exc_val,exc_fr=sys.exc_info()
1160 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1161 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1162 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
1164 def execute(self,outargsname,argsin):
1165 """Execute the script stored in attribute ccode with pickled args (argsin)"""
1167 argsname,kws=pickle.loads(argsin)
1168 self.context.update(kws)
1169 exec(self.ccode, self.context)
1171 for arg in outargsname:
1172 if arg not in self.context:
1173 raise KeyError("There is no variable %s in context" % arg)
1174 argsout.append(self.context[arg])
1175 argsout=pickle.dumps(tuple(argsout),-1)
1178 exc_typ,exc_val,exc_fr=sys.exc_info()
1179 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1180 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1181 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1183 def executeFirst(self,argsin):
1184 """ Same than first part of self.execute to reduce memory peak."""
1185 def ArgInMananger(self,argsin):
1186 argsInPy = SeqByteReceiver( argsin )
1187 data = argsInPy.data()
1188 self.addInfoOnLevel2("inputMem",len(data))
1189 _,kws=pickle.loads(data)
1192 self.beginOfCurrentExecutionSession()
1193 self.addTimeInfoOnLevel2("startInputTime")
1194 # to force call of SeqByteReceiver's destructor
1195 kws = ArgInMananger(self,argsin)
1196 vis = InOutputObjVisitor()
1198 # fetch real data if necessary
1199 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1200 self.addInfoOnLevel2("inputHDDMem",vis)
1201 self.context.update(kws)
1202 self.addTimeInfoOnLevel2("endInputTime")
1204 exc_typ,exc_val,exc_fr=sys.exc_info()
1205 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1206 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1207 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1209 def executeSecond(self,outargsname):
1210 """ Same than second part of self.execute to reduce memory peak."""
1211 def executeSecondInternal(monitoringtimeresms):
1212 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( monitoringtimeresms ) ) as monitoringParams:
1213 currentInstance = self.executeNow( outargsname )
1214 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1215 return cpumeminfo, currentInstance
1219 self.addTimeInfoOnLevel2("startExecTime")
1221 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1222 cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
1224 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1225 self.addTimeInfoOnLevel2("endExecTime")
1226 self.addTimeInfoOnLevel2("startOutputTime")
1228 for arg in outargsname:
1229 if arg not in self.context:
1230 raise KeyError("There is no variable %s in context" % arg)
1231 argsout.append(self.context[arg])
1234 vis = InOutputObjVisitor()
1236 # the proxy mecanism is catched here
1237 argPickle = SpoolPickleObject( arg, vis )
1238 retArg = SenderByte_i( self.poa,argPickle )
1239 id_o = self.poa.activate_object(retArg)
1240 retObj = self.poa.id_to_reference(id_o)
1241 ret.append( retObj._narrow( SALOME.SenderByte ) )
1242 outputMem += len(argPickle)
1243 self.addInfoOnLevel2("outputMem",outputMem)
1244 self.addInfoOnLevel2("outputHDDMem",vis)
1245 self.addTimeInfoOnLevel2("endOutputTime")
1246 self.endOfCurrentExecutionSession()
1249 exc_typ,exc_val,exc_fr=sys.exc_info()
1250 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1251 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1252 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1254 def listAllVarsInContext(self):
1256 pat = re.compile("^__([a-z]+)__$")
1257 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1259 def removeAllVarsInContext(self):
1260 for elt in self.listAllVarsInContext():
1261 del self.context[elt]
1263 def getValueOfVarInContext(self,varName):
1265 return pickle.dumps(self.context[varName],-1)
1267 exc_typ,exc_val,exc_fr=sys.exc_info()
1268 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1269 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1272 def assignVarInContext(self, varName, value):
1274 self.context[varName][0] = pickle.loads(value)
1276 exc_typ,exc_val,exc_fr=sys.exc_info()
1277 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1278 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1281 def callMethodOnVarInContext(self, varName, methodName, args):
1283 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1285 exc_typ,exc_val,exc_fr=sys.exc_info()
1286 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1287 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1290 def beginOfCurrentExecutionSession(self):
1291 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1292 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1294 def endOfCurrentExecutionSession(self):
1295 self._current_execution_session.finalizeAndPushToMaster()
1296 self._current_execution_session = None
1298 def addInfoOnLevel2(self, key, value):
1299 self._current_execution_session.addInfoOnLevel2(key, value)
1301 def addTimeInfoOnLevel2(self, key):
1302 from datetime import datetime
1303 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1305 class PyScriptNode_i(PyScriptNode_Abstract_i):
1306 def __init__(self, nodeName, code, poa, my_container, logscript):
1307 super().__init__(nodeName, code, poa, my_container, logscript)
1309 def executeNow(self, outargsname):
1310 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1312 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1313 def __init__(self, nodeName, code, poa, my_container, logscript):
1314 super().__init__(nodeName, code, poa, my_container, logscript)
1316 def executeNow(self, outargsname):
1317 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1319 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1320 def __init__(self, nodeName, code, poa, my_container, logscript):
1321 super().__init__(nodeName, code, poa, my_container, logscript)
1323 def executeNow(self, outargsname):
1324 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1326 class PyScriptNode_OutOfProcess_FT_i(PyScriptNode_Abstract_i):
1327 def __init__(self, nodeName, code, poa, my_container, logscript):
1328 super().__init__(nodeName, code, poa, my_container, logscript)
1330 def executeNow(self, outargsname):
1331 return ExecCrashProofWithoutReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1333 class PyScriptNode_OutOfProcess_Replay_FT_i(PyScriptNode_Abstract_i):
1334 def __init__(self, nodeName, code, poa, my_container, logscript):
1335 super().__init__(nodeName, code, poa, my_container, logscript)
1337 def executeNow(self, outargsname):
1338 return ExecCrashProofWithReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)