1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
37 from SALOME_ContainerHelper import ScriptExecInfo
39 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
41 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
43 MY_KEY_TO_DETECT_FINISH = "neib av tuot"
45 class Generic(SALOME__POA.GenericObj):
46 """A Python implementation of the GenericObj CORBA IDL"""
47 def __init__(self,poa):
52 #print("Register called : %d"%self.cnt)
56 #print("UnRegister called : %d"%self.cnt)
59 oid=self.poa.servant_to_id(self)
60 self.poa.deactivate_object(oid)
63 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
67 #print("Destuctor called")
70 class PyNode_i (Engines__POA.PyNode,Generic):
71 """The implementation of the PyNode CORBA IDL"""
72 def __init__(self, nodeName,code,poa,my_container):
73 """Initialize the node : compilation in the local context"""
74 Generic.__init__(self,poa)
75 self.nodeName=nodeName
77 self.my_container=my_container._container
78 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
79 ccode=compile(code,nodeName,'exec')
81 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
82 exec(ccode, self.context)
84 def getContainer(self):
85 return self.my_container
93 def defineNewCustomVar(self,varName,valueOfVar):
94 self.context[varName] = pickle.loads(valueOfVar)
97 def executeAnotherPieceOfCode(self,code):
98 """Called for initialization of container lodging self."""
100 ccode=compile(code,self.nodeName,'exec')
101 exec(ccode, self.context)
103 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
105 def execute(self,funcName,argsin):
106 """Execute the function funcName found in local context with pickled args (argsin)"""
108 argsin,kws=pickle.loads(argsin)
109 func=self.context[funcName]
110 argsout=func(*argsin,**kws)
111 argsout=pickle.dumps(argsout,-1)
114 exc_typ,exc_val,exc_fr=sys.exc_info()
115 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
116 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
118 class SenderByte_i(SALOME__POA.SenderByte,Generic):
119 def __init__(self,poa,bytesToSend):
120 Generic.__init__(self,poa)
121 self.bytesToSend = bytesToSend
124 return len(self.bytesToSend)
126 def sendPart(self,n1,n2):
127 return self.bytesToSend[n1:n2]
129 def IsRemote(hostName):
131 return socket.gethostname() != hostName
133 def RemoveFileSafe( fileName ):
134 if os.path.exists( fileName ):
135 os.unlink( fileName )
137 def RetrieveRemoteFileLocallyInSameFileName( remoteHostName, fileName):
139 dn = os.path.dirname( fileName )
140 import subprocess as sp
141 p = sp.Popen(["scp","{}:{}".format(remoteHostName,fileName),dn])
144 def DestroyRemotely( remoteHostName, fileName):
145 import subprocess as sp
146 p = sp.Popen(["ssh","-qC","-oStrictHostKeyChecking=no","-oBatchMode=yes",remoteHostName,"rm {}".format( fileName )])
149 class CopyFileFromRemoteCtxMgr:
150 def __init__(self, hostName, fileName):
151 self._remoteHostName = hostName
152 self._fileName = fileName
153 self._isRemote = IsRemote( hostName )
156 if not self._isRemote:
158 dn = os.path.dirname( self._fileName )
159 if not os.path.isdir( dn ):
161 RetrieveRemoteFileLocallyInSameFileName(self._remoteHostName,self._fileName)
163 def __exit__(self,exctype, exc, tb):
164 if not self._isRemote:
166 os.unlink( self._fileName )
168 class BigFileOnDiskBase(abc.ABC):
170 Base class in charge of managing
171 Copy or share of file accross computation Nodes
173 def __init__(self, fileName):
174 self._file_name = fileName
176 def getFileName(self):
177 return self._file_name
180 def get(self, visitor = None):
182 Method called client side of data.
184 raise RuntimeError("Not implemented !")
189 Method called client side of data.
191 raise RuntimeError("Not implemented !")
194 class BigFileOnDiskShare(BigFileOnDiskBase):
195 def __init__(self, fileName):
196 super().__init__( fileName )
198 def get(self, visitor = None):
199 return GetObjectFromFile( self._file_name, visitor )
202 RemoveFileSafe( self._file_name )
204 class BigFileOnDiskSSDNoShare(BigFileOnDiskBase):
205 def __init__(self, fileName):
207 super().__init__( fileName )
208 # hostname hosting data
209 self._hostname = socket.gethostname()
211 def get(self, visitor = None):
212 with CopyFileFromRemoteCtxMgr(self._hostname, self._file_name):
213 return GetObjectFromFile( self._file_name, visitor )
216 if IsRemote( self._hostname ):
217 DestroyRemotely(self._hostname,self._file_name)
219 RemoveFileSafe( self._file_name )
221 BigFileOnDiskClsFromProtocol = { 0 : BigFileOnDiskShare, 1 : BigFileOnDiskSSDNoShare }
223 DicoForProxyFile = { }
225 def GetSizeOfBufferedReader(f):
227 This method returns in bytes size of a file openned.
231 f (io.IOBase): buffered reader returned by open
239 f.seek(0,io.SEEK_END)
241 f.seek(pos,io.SEEK_SET)
244 def GetObjectFromFile(fname, visitor = None):
245 with open(fname,"rb") as f:
247 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
248 visitor.setFileName( fname )
252 def DumpInFile(obj,fname):
253 with open(fname,"wb") as f:
256 def IncrRefInFile(fname):
261 if fname in DicoForProxyFile:
262 DicoForProxyFile[fname] += 1
264 DicoForProxyFile[fname] = 2
267 def DecrRefInFile(fname):
270 :type fname: BigFileOnDiskBase
272 if fname not in DicoForProxyFile:
275 cnt = DicoForProxyFile[fname.getFileName()]
276 DicoForProxyFile[fname.getFileName()] -= 1
278 del DicoForProxyFile[fname.getFileName()]
283 def GetBigObjectOnDiskThreshold():
284 return KernelBasis.GetBigObjOnDiskThreshold()
286 def ActivateProxyMecanismOrNot( sizeInByte ):
287 thres = GetBigObjectOnDiskThreshold()
291 return sizeInByte > thres
293 def GetBigObjectDirectory():
295 protocol, directory = KernelBasis.GetBigObjOnDiskProtocolAndDirectory()
297 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
298 return protocol, os.path.expanduser( os.path.expandvars( directory ) )
300 def GetBigObjectFileName():
302 Return a filename in the most secure manner (see tempfile documentation)
305 protocol, directory = GetBigObjectDirectory()
306 with tempfile.NamedTemporaryFile(dir = directory, prefix="mem_", suffix=".pckl") as f:
308 return BigFileOnDiskClsFromProtocol[protocol]( ret )
310 class BigObjectOnDiskBase:
311 def __init__(self, fileName, objSerialized):
313 :param fileName: the file used to dump into.
314 :param objSerialized: the object in pickeled form
315 :type objSerialized: bytes
317 self._filename = fileName
318 # attribute _destroy is here to tell client side or server side
319 # only client side can be with _destroy set to True. server side due to risk of concurrency
320 # so pickled form of self must be done with this attribute set to False.
321 self._destroy = False
322 self.__dumpIntoFile(objSerialized)
324 def getDestroyStatus(self):
329 IncrRefInFile( self._filename )
331 # should never happen !
332 RuntimeError("Invalid call to incrRef !")
336 DecrRefInFile( self._filename )
338 # should never happen !
339 RuntimeError("Invalid call to decrRef !")
341 def unlinkOnDestructor(self):
344 def doNotTouchFile(self):
346 Method called slave side. The life cycle management of file is client side not slave side.
348 self._destroy = False
352 DecrRefInFile( self._filename )
354 def getFileName(self):
355 return self._filename
357 def __dumpIntoFile(self, objSerialized):
358 DumpInFile( objSerialized, self._filename.getFileName() )
360 def get(self, visitor = None):
361 return self._filename.get(visitor)
364 return float( self.get() )
367 return int( self.get() )
371 if isinstance(obj,str):
374 raise RuntimeError("Not a string")
376 class BigObjectOnDisk(BigObjectOnDiskBase):
377 def __init__(self, fileName, objSerialized):
378 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
380 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
381 def __init__(self, pos, length, fileName):
382 self._filename = fileName
383 self._destroy = False
385 self._length = length
387 def get(self, visitor = None):
388 fullObj = BigObjectOnDiskBase.get(self, visitor)
389 return fullObj[ self._pos ]
391 def __getitem__(self, i):
395 return len(self.get())
397 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
398 def __init__(self, length, fileName, objSerialized):
399 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
400 self._length = length
402 def __getitem__(self, i):
403 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
408 class BigObjectOnDiskList(BigObjectOnDiskSequence):
409 def __init__(self, length, fileName, objSerialized):
410 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
412 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
413 def __init__(self, length, fileName, objSerialized):
414 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
416 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
418 This method return a proxy instance of pickled form of object given in input.
422 obj (pickelable type) : object to be proxified
423 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
427 BigObjectOnDiskBase: proxy instance
429 pickleObj = pickleObjInit
430 if pickleObj is None:
431 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
432 fileName = GetBigObjectFileName()
434 visitor.setHDDMem( len(pickleObj) )
435 visitor.setFileName( fileName.getFileName() )
436 if isinstance( obj, list):
437 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
438 elif isinstance( obj, tuple):
439 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
441 proxyObj = BigObjectOnDisk( fileName , pickleObj )
444 def SpoolPickleObject( obj, visitor = None ):
446 with InOutputObjVisitorCM(visitor) as v:
447 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
448 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
451 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
452 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
455 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
457 def UnProxyObjectSimple( obj, visitor = None ):
459 Method to be called in Remote mode. Alterate the obj _status attribute.
460 Because the slave process does not participate in the reference counting
464 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
467 with InOutputObjVisitorCM(visitor) as v:
468 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
469 if isinstance(obj,BigObjectOnDiskBase):
472 elif isinstance( obj, list):
475 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
480 def UnProxyObjectSimpleLocal( obj ):
482 Method to be called in Local mode. Do not alterate the PyObj counter
484 if isinstance(obj,BigObjectOnDiskBase):
486 elif isinstance( obj, list):
489 retObj.append( UnProxyObjectSimpleLocal(elt) )
495 def __init__(self, fileName):
496 self._filename = fileName
499 return self._filename
501 class FileDeleter(FileHolder):
502 def __init__(self, fileName):
503 super().__init__( fileName )
506 if os.path.exists( self._filename ):
507 os.unlink( self._filename )
509 class MonitoringInfo:
510 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
511 self._py_file_name = pyFileName
512 self._interval_in_ms = intervalInMs
513 self._out_file_name = outFileName
517 def pyFileName(self):
518 return self._py_file_name
525 def pid(self, value):
529 def outFileName(self):
530 return self._out_file_name
533 def intervalInMs(self):
534 return self._interval_in_ms
536 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
538 This method loops indefinitely every intervalInMs milliseconds to scan
539 number of inodes and size of content recursively included into the in input directory.
544 outFileName (str) : name of file inside the results will be written. If None a new file is generated
546 See also CPUMemoryMonitoring
550 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
554 # outFileNameSave stores the content of outFileName during phase of dumping
555 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
556 outFileNameSave = f.name
557 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
559 tempOutFile = outFileName
560 if tempOutFile is None:
561 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
562 with open(tempPyFile,"w") as f:
564 import subprocess as sp
569 with open("{tempOutFile}","a") as f:
570 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
571 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
575 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
580 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
581 szOfDirStr = re.split("[\s]+",st)[0]
584 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
585 f.write( "{{}}\\n".format( str( nbinodes ) ) )
586 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
588 time.sleep( {intervalInMs} / 1000.0 )
589 """.format( **locals()))
590 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
591 pyFileName = FileDeleter( tempPyFile )
592 if outFileName is None:
593 outFileName = FileDeleter( tempOutFile )
595 outFileName = FileHolder(outFileName)
596 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
598 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
600 Launch a subprocess monitoring self process.
601 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
602 CPU usage and RSS memory of the calling process.
603 Communication between subprocess and self is done by file.
607 outFileName (str) : name of file inside the results will be written. If None a new file is generated
609 See also FileSystemMonitoring
612 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
615 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
617 tempOutFile = outFileName
618 if tempOutFile is None:
619 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
621 with open(tempPyFile,"w") as f:
622 f.write("""import psutil
624 process = psutil.Process( pid )
625 def getChargeOf( p ):
626 a,b = p.cpu_percent(), p.memory_info().rss
628 for c in p.children():
629 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
634 with open("{}","a") as f:
635 f.write( "{{}}\\n".format( "{}" ) )
637 cpu,mem_rss = getChargeOf( process )
638 f.write( "{{}}\\n".format( str( cpu ) ) )
639 f.write( "{{}}\\n".format( str( mem_rss ) ) )
641 time.sleep( {} / 1000.0 )
642 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
643 if outFileName is None:
644 autoOutFile = FileDeleter(tempOutFile)
646 autoOutFile = FileHolder(tempOutFile)
647 return FileDeleter(tempPyFile),autoOutFile
648 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
649 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
651 class GenericPythonMonitoringLauncherCtxMgr:
652 def __init__(self, monitoringParams):
656 monitoringParams (MonitoringInfo)
658 self._monitoring_params = monitoringParams
661 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
662 self._monitoring_params.pid = pid
663 return self._monitoring_params
665 def __exit__(self,exctype, exc, tb):
666 StopMonitoring( self._monitoring_params )
667 del self._monitoring_params
669 gc.collect() # force destruction of objects even in raise context
671 def StopMonitoring( monitoringInfo ):
673 Kill monitoring subprocess.
677 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
680 KernelBasis.StopMonitoring(monitoringInfo.pid)
683 def __init__(self, intervalInMs, cpu, mem_rss):
688 cpu (list<float>) CPU usage
689 mem_rss (list<int>) rss memory usage
691 self._interval_in_ms = intervalInMs
692 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
694 st = """Interval in ms : {self.intervalInMs}
696 """.format( **locals() )
699 def intervalInMs(self):
700 return self._interval_in_ms
704 list of triplets. First param of pair is cpu usage
705 Second param of pair is memory usage
709 def ReadCPUMemInfoInternal( fileName ):
711 cpu = [] ; mem_rss = []
712 if os.path.exists( fileName ):
714 with open(fileName, "r") as f:
715 coarseData = [ elt.strip() for elt in f.readlines() ]
716 intervalInMs = int( coarseData[0] )
717 coarseData = coarseData[1:]
718 cpu = [float(elt) for elt in coarseData[::2]]
719 mem_rss = [ int(elt) for elt in coarseData[1::2]]
722 return CPUMemInfo(intervalInMs,cpu,mem_rss)
724 def ReadCPUMemInfo( monitoringInfo ):
726 Retrieve CPU/Mem data of monitoring.
730 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
736 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
739 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
743 timeStamps (list<datetimestruct>)
745 volumeOfDir (list<str>)
747 self._dir_name_monitored = dirNameMonitored
748 self._interval_in_ms = intervalInMs
749 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
751 st = """Filename monitored : {self.dirNameMonitored}
752 Interval in ms : ${self.intervalInMs}
754 """.format( **locals() )
757 def dirNameMonitored(self):
758 return self._dir_name_monitored
760 def intervalInMs(self):
761 return self._interval_in_ms
765 list of triplets. First param of triplet is datetimestruct
766 Second param of triplet is #inodes.
767 Thirst param of triplet is size.
771 def ReadInodeSizeInfoInternal( fileName ):
774 with open(fileName, "r") as f:
775 coarseData = [ elt.strip() for elt in f.readlines() ]
776 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
777 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
778 nbInodes = [int(elt) for elt in coarseData[1::3]]
779 volumeOfDir = coarseData[2::3]
780 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
782 def ReadInodeSizeInfo( monitoringInfo ):
784 Retrieve nb of inodes and size of monitoring
788 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
794 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
796 class SeqByteReceiver:
797 # 2GB limit to trigger split into chunks
798 CHUNK_SIZE = 2000000000
799 def __init__(self,sender):
802 self._obj.UnRegister()
805 size = self._obj.getSize()
806 if size <= SeqByteReceiver.CHUNK_SIZE:
807 return self.fetchOneShot( size )
809 return self.fetchByChunks( size )
810 def fetchOneShot(self,size):
811 return self._obj.sendPart(0,size)
812 def fetchByChunks(self,size):
814 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
816 data_for_split_case = bytes(0)
817 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
818 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
819 while iStart!=iEnd and iEnd <= size:
820 part = self._obj.sendPart(iStart,iEnd)
821 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
822 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
823 return data_for_split_case
825 FinalCode = """import pickle
826 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
829 orb = CORBA.ORB_init([''])
832 outputFileName = "{}"
834 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
835 with open(inputFileName,"rb") as f:
836 context = pickle.load( f )
837 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
838 with open(codeFileName,"r") as f:
841 exec( code , context )
842 # filter part of context to be exported to father process
843 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
845 with open(outputFileName,"wb") as f:
846 pickle.dump( context, f )
849 class PythonFunctionEvaluatorParams:
850 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
851 self._main_filename = mainFileName
852 self._code_filename = codeFileName
853 self._in_context_filename = inContextFileName
854 self._out_context_filename = outContextFileName
858 with open(self._out_context_filename,"rb") as f:
859 return pickle.load( f )
860 def destroyOnOK(self):
861 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
862 if os.path.exists( fileToDestroy ):
863 os.unlink( fileToDestroy )
864 def destroyOnKO(self, containerRef):
866 Called in the context of failure with replay mode activated
868 for fileToDestroy in [self._out_context_filename]:
869 if os.path.exists( fileToDestroy ):
870 os.unlink( fileToDestroy )
871 # register to container files group associated to the
872 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
875 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
878 def cleanOperations(self):
880 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
882 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
884 return f"return with non zero code ({returnCode})"
887 if keepFilesToReplay:
888 return f"""return with non zero code ({returnCode})
890 Looks like a hard crash as returnCode {returnCode} != 0
892 {self.cleanOperations}
896 return f"""return with non zero code ({returnCode})
898 Looks like a hard crash as returnCode {returnCode} != 0
902 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
904 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
909 code (str) : python code to be executed using context
910 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
911 outargsname (list<str>) : list of arguments to be exported
912 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
913 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
914 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
915 closeEyesOnErrorAtExit (bool) : if True in case of crash of subprocess, if MY_KEY_TO_DETECT_FINISH is displayed at the end of stdout
920 ScriptExecInfo : instance serverside
925 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
929 import subprocess as sp
932 def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
933 def StdErrTreatment(closeEyesOnErrorAtExit , stderr):
934 if not closeEyesOnErrorAtExit:
937 return stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
939 return True,StdErrTreatment(closeEyesOnErrorAtExit , stderr)
940 if not closeEyesOnErrorAtExit:
942 if stderr[-len(MY_KEY_TO_DETECT_FINISH):] == MY_KEY_TO_DETECT_FINISH:
943 return True,stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
948 def InternalExecResistant( code, context, outargsname):
950 orb = CORBA.ORB_init([''])
951 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
953 EXEC_CODE_FNAME_PXF = "execsafe_"
954 def RetrieveUniquePartFromPfx( fname ):
955 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
956 dirForReplayFiles = KernelBasis.GetDirectoryForReplayFiles()
957 if not dirForReplayFiles:
958 raise RuntimeError("You are in context of exec resistant you have to position Directory hosting these files properly")
959 with tempfile.NamedTemporaryFile(dir=dirForReplayFiles,prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
960 codeFd.write( "{}\n".format( containerRef.get_startup_code() ) )
962 if closeEyesOnErrorAtExit:
965 sys.stderr.write({!r})
966 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
968 codeFileName = os.path.basename( codeFd.name )
969 contextFileName = os.path.join( dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
970 with open(contextFileName,"wb") as contextFd:
971 pickle.dump( context, contextFd)
972 resFileName = os.path.join( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
973 mainExecFileName = os.path.join( dirForReplayFiles, "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
974 with open(mainExecFileName,"w") as f:
975 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
976 for iTry in range( KernelBasis.GetNumberOfRetry() ):
978 print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
979 p = sp.Popen(["python3", mainExecFileName],cwd = dirForReplayFiles,stdout = sp.PIPE, stderr = sp.PIPE)
980 stdout, stderr = p.communicate()
981 returnCode = p.returncode
984 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
985 ret = instanceOfLogOfCurrentSession._current_instance
986 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
987 stdout = stdout.decode()
988 stderr = stderr.decode()
989 sys.stdout.write( stdout ) ; sys.stdout.flush()
990 isOK, stderr = IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr )
991 sys.stderr.write( stderr ) ; sys.stderr.flush()
993 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
994 if len(pcklData) > 0:
995 ret = pickle.loads( pcklData )
996 context.update( evParams.result )
997 evParams.destroyOnOK()
999 print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
1002 if keepFilesToReplay:
1003 evParams.destroyOnKO( containerRef )
1005 evParams.destroyOnOK()
1006 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
1008 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1009 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
1011 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1012 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
1014 def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1015 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
1017 def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1018 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
1020 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1021 exec( code, context )
1022 return instanceOfLogOfCurrentSession._current_instance
1024 class LogOfCurrentExecutionSessionAbs(abc.ABC):
1026 self._current_instance = ScriptExecInfo()
1028 def addInfoOnLevel2(self, key, value):
1029 setattr(self._current_instance,key,value)
1032 def addFreestyleAndFlush(self, value):
1033 raise RuntimeError("Must be overloaded")
1035 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
1036 def __init__(self, handleToCentralizedInst):
1038 self._remote_handle = handleToCentralizedInst
1040 def addFreestyleAndFlush(self, value):
1041 self._current_instance.freestyle = value
1042 self.finalizeAndPushToMaster()
1044 def finalizeAndPushToMaster(self):
1046 Voluntary do nothing in case of problem to avoid to trouble execution
1049 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
1053 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
1055 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
1057 def __init__(self, handleToCentralizedInst = None):
1059 def addFreestyleAndFlush(self, value):
1062 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
1063 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
1064 def __init__(self, nodeName, code, poa, my_container, logscript):
1065 """Initialize the node : compilation in the local context"""
1066 Generic.__init__(self,poa)
1067 self.nodeName=nodeName
1069 self.my_container_py = my_container
1070 self.my_container=my_container._container
1071 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
1072 self.ccode=compile(code,nodeName,'exec')
1074 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
1075 self._log_script = logscript
1076 self._current_execution_session = None
1077 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
1080 def executeNow(self, outargsname):
1081 raise RuntimeError("Must be overloaded")
1084 # force removal of self.context. Don t know why it s not done by default
1085 self.removeAllVarsInContext()
1088 def getContainer(self):
1089 return self.my_container
1095 return self.nodeName
1097 def defineNewCustomVar(self,varName,valueOfVar):
1098 self.context[varName] = pickle.loads(valueOfVar)
1101 def executeAnotherPieceOfCode(self,code):
1102 """Called for initialization of container lodging self."""
1104 ccode=compile(code,self.nodeName,'exec')
1105 exec(ccode, self.context)
1107 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
1109 def assignNewCompiledCode(self,codeStr):
1112 self.ccode=compile(codeStr,self.nodeName,'exec')
1114 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
1116 def executeSimple(self, key, val):
1118 Same as execute method except that no pickelization mecanism is implied here. No output is expected
1121 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
1122 exec(self.ccode,self.context)
1124 exc_typ,exc_val,exc_fr=sys.exc_info()
1125 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1126 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1127 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
1129 def execute(self,outargsname,argsin):
1130 """Execute the script stored in attribute ccode with pickled args (argsin)"""
1132 argsname,kws=pickle.loads(argsin)
1133 self.context.update(kws)
1134 exec(self.ccode, self.context)
1136 for arg in outargsname:
1137 if arg not in self.context:
1138 raise KeyError("There is no variable %s in context" % arg)
1139 argsout.append(self.context[arg])
1140 argsout=pickle.dumps(tuple(argsout),-1)
1143 exc_typ,exc_val,exc_fr=sys.exc_info()
1144 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1145 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1146 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1148 def executeFirst(self,argsin):
1149 """ Same than first part of self.execute to reduce memory peak."""
1150 def ArgInMananger(self,argsin):
1151 argsInPy = SeqByteReceiver( argsin )
1152 data = argsInPy.data()
1153 self.addInfoOnLevel2("inputMem",len(data))
1154 _,kws=pickle.loads(data)
1157 self.beginOfCurrentExecutionSession()
1158 self.addTimeInfoOnLevel2("startInputTime")
1159 # to force call of SeqByteReceiver's destructor
1160 kws = ArgInMananger(self,argsin)
1161 vis = InOutputObjVisitor()
1163 # fetch real data if necessary
1164 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1165 self.addInfoOnLevel2("inputHDDMem",vis)
1166 self.context.update(kws)
1167 self.addTimeInfoOnLevel2("endInputTime")
1169 exc_typ,exc_val,exc_fr=sys.exc_info()
1170 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1171 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1172 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1174 def executeSecond(self,outargsname):
1175 """ Same than second part of self.execute to reduce memory peak."""
1176 def executeSecondInternal(monitoringtimeresms):
1177 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( monitoringtimeresms ) ) as monitoringParams:
1178 currentInstance = self.executeNow( outargsname )
1179 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1180 return cpumeminfo, currentInstance
1184 self.addTimeInfoOnLevel2("startExecTime")
1186 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1187 cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
1189 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1190 self.addTimeInfoOnLevel2("endExecTime")
1191 self.addTimeInfoOnLevel2("startOutputTime")
1193 for arg in outargsname:
1194 if arg not in self.context:
1195 raise KeyError("There is no variable %s in context" % arg)
1196 argsout.append(self.context[arg])
1199 vis = InOutputObjVisitor()
1201 # the proxy mecanism is catched here
1202 argPickle = SpoolPickleObject( arg, vis )
1203 retArg = SenderByte_i( self.poa,argPickle )
1204 id_o = self.poa.activate_object(retArg)
1205 retObj = self.poa.id_to_reference(id_o)
1206 ret.append( retObj._narrow( SALOME.SenderByte ) )
1207 outputMem += len(argPickle)
1208 self.addInfoOnLevel2("outputMem",outputMem)
1209 self.addInfoOnLevel2("outputHDDMem",vis)
1210 self.addTimeInfoOnLevel2("endOutputTime")
1211 self.endOfCurrentExecutionSession()
1214 exc_typ,exc_val,exc_fr=sys.exc_info()
1215 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1216 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1217 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1219 def listAllVarsInContext(self):
1221 pat = re.compile("^__([a-z]+)__$")
1222 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1224 def removeAllVarsInContext(self):
1225 for elt in self.listAllVarsInContext():
1226 del self.context[elt]
1228 def getValueOfVarInContext(self,varName):
1230 return pickle.dumps(self.context[varName],-1)
1232 exc_typ,exc_val,exc_fr=sys.exc_info()
1233 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1234 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1237 def assignVarInContext(self, varName, value):
1239 self.context[varName][0] = pickle.loads(value)
1241 exc_typ,exc_val,exc_fr=sys.exc_info()
1242 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1243 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1246 def callMethodOnVarInContext(self, varName, methodName, args):
1248 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1250 exc_typ,exc_val,exc_fr=sys.exc_info()
1251 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1252 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1255 def beginOfCurrentExecutionSession(self):
1256 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1257 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1259 def endOfCurrentExecutionSession(self):
1260 self._current_execution_session.finalizeAndPushToMaster()
1261 self._current_execution_session = None
1263 def addInfoOnLevel2(self, key, value):
1264 self._current_execution_session.addInfoOnLevel2(key, value)
1266 def addTimeInfoOnLevel2(self, key):
1267 from datetime import datetime
1268 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1270 class PyScriptNode_i(PyScriptNode_Abstract_i):
1271 def __init__(self, nodeName, code, poa, my_container, logscript):
1272 super().__init__(nodeName, code, poa, my_container, logscript)
1274 def executeNow(self, outargsname):
1275 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1277 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1278 def __init__(self, nodeName, code, poa, my_container, logscript):
1279 super().__init__(nodeName, code, poa, my_container, logscript)
1281 def executeNow(self, outargsname):
1282 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1284 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1285 def __init__(self, nodeName, code, poa, my_container, logscript):
1286 super().__init__(nodeName, code, poa, my_container, logscript)
1288 def executeNow(self, outargsname):
1289 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1291 class PyScriptNode_OutOfProcess_FT_i(PyScriptNode_Abstract_i):
1292 def __init__(self, nodeName, code, poa, my_container, logscript):
1293 super().__init__(nodeName, code, poa, my_container, logscript)
1295 def executeNow(self, outargsname):
1296 return ExecCrashProofWithoutReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1298 class PyScriptNode_OutOfProcess_Replay_FT_i(PyScriptNode_Abstract_i):
1299 def __init__(self, nodeName, code, poa, my_container, logscript):
1300 super().__init__(nodeName, code, poa, my_container, logscript)
1302 def executeNow(self, outargsname):
1303 return ExecCrashProofWithReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)