1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # File : SALOME_PyNode.py
22 # Author : Christian CAREMOLI, EDF
37 from SALOME_ContainerHelper import ScriptExecInfo
39 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
41 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
43 MY_KEY_TO_DETECT_FINISH = "neib av tuot"
45 class Generic(SALOME__POA.GenericObj):
46 """A Python implementation of the GenericObj CORBA IDL"""
47 def __init__(self,poa):
52 #print("Register called : %d"%self.cnt)
56 #print("UnRegister called : %d"%self.cnt)
59 oid=self.poa.servant_to_id(self)
60 self.poa.deactivate_object(oid)
63 print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
67 #print("Destuctor called")
70 class PyNode_i (Engines__POA.PyNode,Generic):
71 """The implementation of the PyNode CORBA IDL"""
72 def __init__(self, nodeName,code,poa,my_container):
73 """Initialize the node : compilation in the local context"""
74 Generic.__init__(self,poa)
75 self.nodeName=nodeName
77 self.my_container=my_container._container
78 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
79 ccode=compile(code,nodeName,'exec')
81 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
82 exec(ccode, self.context)
84 def getContainer(self):
85 return self.my_container
93 def defineNewCustomVar(self,varName,valueOfVar):
94 self.context[varName] = pickle.loads(valueOfVar)
97 def executeAnotherPieceOfCode(self,code):
98 """Called for initialization of container lodging self."""
100 ccode=compile(code,self.nodeName,'exec')
101 exec(ccode, self.context)
103 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
105 def execute(self,funcName,argsin):
106 """Execute the function funcName found in local context with pickled args (argsin)"""
108 argsin,kws=pickle.loads(argsin)
109 func=self.context[funcName]
110 argsout=func(*argsin,**kws)
111 argsout=pickle.dumps(argsout,-1)
114 exc_typ,exc_val,exc_fr=sys.exc_info()
115 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
116 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
118 class SenderByte_i(SALOME__POA.SenderByte,Generic):
119 def __init__(self,poa,bytesToSend):
120 Generic.__init__(self,poa)
121 self.bytesToSend = bytesToSend
124 return len(self.bytesToSend)
126 def sendPart(self,n1,n2):
127 return self.bytesToSend[n1:n2]
129 def IsRemote(hostName):
131 return socket.gethostname() != hostName
133 def RemoveFileSafe( fileName ):
134 if os.path.exists( fileName ):
135 os.unlink( fileName )
137 def RetrieveRemoteFileLocallyInSameFileName( remoteHostName, fileName):
139 dn = os.path.dirname( fileName )
140 import subprocess as sp
141 p = sp.Popen(["scp","{}:{}".format(remoteHostName,fileName),dn])
144 def DestroyRemotely( remoteHostName, fileName):
145 import subprocess as sp
146 p = sp.Popen(["ssh","-qC","-oStrictHostKeyChecking=no","-oBatchMode=yes",remoteHostName,"rm {}".format( fileName )])
149 class CopyFileFromRemoteCtxMgr:
150 def __init__(self, hostName, fileName):
151 self._remoteHostName = hostName
152 self._fileName = fileName
153 self._isRemote = IsRemote( hostName )
156 if not self._isRemote:
158 dn = os.path.dirname( self._fileName )
159 if not os.path.isdir( dn ):
161 RetrieveRemoteFileLocallyInSameFileName(self._remoteHostName,self._fileName)
163 def __exit__(self,exctype, exc, tb):
164 if not self._isRemote:
166 os.unlink( self._fileName )
168 class BigFileOnDiskBase(abc.ABC):
170 Base class in charge of managing
171 Copy or share of file accross computation Nodes
173 def __init__(self, fileName):
174 self._file_name = fileName
176 def getFileName(self):
177 return self._file_name
180 def get(self, visitor = None):
182 Method called client side of data.
184 raise RuntimeError("Not implemented !")
189 Method called client side of data.
191 raise RuntimeError("Not implemented !")
194 class BigFileOnDiskShare(BigFileOnDiskBase):
195 def __init__(self, fileName):
196 super().__init__( fileName )
198 def get(self, visitor = None):
199 return GetObjectFromFile( self._file_name, visitor )
202 RemoveFileSafe( self._file_name )
204 class BigFileOnDiskSSDNoShare(BigFileOnDiskBase):
205 def __init__(self, fileName):
207 super().__init__( fileName )
208 # hostname hosting data
209 self._hostname = socket.gethostname()
211 def get(self, visitor = None):
212 with CopyFileFromRemoteCtxMgr(self._hostname, self._file_name):
213 return GetObjectFromFile( self._file_name, visitor )
216 if IsRemote( self._hostname ):
217 DestroyRemotely(self._hostname,self._file_name)
219 RemoveFileSafe( self._file_name )
221 BigFileOnDiskClsFromProtocol = { 0 : BigFileOnDiskShare, 1 : BigFileOnDiskSSDNoShare }
223 DicoForProxyFile = { }
225 def GetSizeOfBufferedReader(f):
227 This method returns in bytes size of a file openned.
231 f (io.IOBase): buffered reader returned by open
239 f.seek(0,io.SEEK_END)
241 f.seek(pos,io.SEEK_SET)
244 def GetObjectFromFile(fname, visitor = None):
245 with open(fname,"rb") as f:
247 visitor.setHDDMem( GetSizeOfBufferedReader(f) )
248 visitor.setFileName( fname )
252 def DumpInFile(obj,fname):
253 with open(fname,"wb") as f:
256 def IncrRefInFile(fname):
261 if fname in DicoForProxyFile:
262 DicoForProxyFile[fname] += 1
264 DicoForProxyFile[fname] = 2
267 def DecrRefInFile(fname):
270 :type fname: BigFileOnDiskBase
272 if fname not in DicoForProxyFile:
275 cnt = DicoForProxyFile[fname.getFileName()]
276 DicoForProxyFile[fname.getFileName()] -= 1
278 del DicoForProxyFile[fname.getFileName()]
283 def GetBigObjectOnDiskThreshold():
284 return KernelBasis.GetBigObjOnDiskThreshold()
286 def ActivateProxyMecanismOrNot( sizeInByte ):
287 thres = GetBigObjectOnDiskThreshold()
291 return sizeInByte > thres
293 def GetBigObjectDirectory():
295 protocol, directory = KernelBasis.GetBigObjOnDiskProtocolAndDirectory()
297 raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
298 return protocol, os.path.expanduser( os.path.expandvars( directory ) )
300 def GetBigObjectFileName():
302 Return a filename in the most secure manner (see tempfile documentation)
305 protocol, directory = GetBigObjectDirectory()
306 with tempfile.NamedTemporaryFile(dir = directory, prefix="mem_", suffix=".pckl") as f:
308 return BigFileOnDiskClsFromProtocol[protocol]( ret )
310 class BigObjectOnDiskBase:
311 def __init__(self, fileName, objSerialized):
313 :param fileName: the file used to dump into.
314 :param objSerialized: the object in pickeled form
315 :type objSerialized: bytes
317 self._filename = fileName
318 # attribute _destroy is here to tell client side or server side
319 # only client side can be with _destroy set to True. server side due to risk of concurrency
320 # so pickled form of self must be done with this attribute set to False.
321 self._destroy = False
322 self.__dumpIntoFile(objSerialized)
324 def getDestroyStatus(self):
329 IncrRefInFile( self._filename )
331 # should never happen !
332 RuntimeError("Invalid call to incrRef !")
336 DecrRefInFile( self._filename )
338 # should never happen !
339 RuntimeError("Invalid call to decrRef !")
341 def unlinkOnDestructor(self):
344 def doNotTouchFile(self):
346 Method called slave side. The life cycle management of file is client side not slave side.
348 self._destroy = False
352 DecrRefInFile( self._filename )
354 def getFileName(self):
355 return self._filename
357 def __dumpIntoFile(self, objSerialized):
358 DumpInFile( objSerialized, self._filename.getFileName() )
360 def get(self, visitor = None):
361 return self._filename.get(visitor)
364 return float( self.get() )
367 return int( self.get() )
371 if isinstance(obj,str):
374 raise RuntimeError("Not a string")
376 class BigObjectOnDisk(BigObjectOnDiskBase):
377 def __init__(self, fileName, objSerialized):
378 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
380 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
381 def __init__(self, pos, length, fileName):
382 self._filename = fileName
383 self._destroy = False
385 self._length = length
387 def get(self, visitor = None):
388 fullObj = BigObjectOnDiskBase.get(self, visitor)
389 return fullObj[ self._pos ]
391 def __getitem__(self, i):
395 return len(self.get())
397 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
398 def __init__(self, length, fileName, objSerialized):
399 BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
400 self._length = length
402 def __getitem__(self, i):
403 return BigObjectOnDiskListElement(i, self._length, self.getFileName())
408 class BigObjectOnDiskList(BigObjectOnDiskSequence):
409 def __init__(self, length, fileName, objSerialized):
410 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
412 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
413 def __init__(self, length, fileName, objSerialized):
414 BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
416 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
418 This method return a proxy instance of pickled form of object given in input.
422 obj (pickelable type) : object to be proxified
423 pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
427 BigObjectOnDiskBase: proxy instance
429 pickleObj = pickleObjInit
430 if pickleObj is None:
431 pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
432 fileName = GetBigObjectFileName()
434 visitor.setHDDMem( len(pickleObj) )
435 visitor.setFileName( fileName.getFileName() )
436 if isinstance( obj, list):
437 proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
438 elif isinstance( obj, tuple):
439 proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
441 proxyObj = BigObjectOnDisk( fileName , pickleObj )
444 def SpoolPickleObject( obj, visitor = None ):
446 with InOutputObjVisitorCM(visitor) as v:
447 pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
448 if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
451 proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
452 pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
455 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
457 def UnProxyObjectSimple( obj, visitor = None ):
459 Method to be called in Remote mode. Alterate the obj _status attribute.
460 Because the slave process does not participate in the reference counting
464 visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
467 with InOutputObjVisitorCM(visitor) as v:
468 logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
469 if isinstance(obj,BigObjectOnDiskBase):
472 elif isinstance( obj, list):
475 retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
480 def UnProxyObjectSimpleLocal( obj ):
482 Method to be called in Local mode. Do not alterate the PyObj counter
484 if isinstance(obj,BigObjectOnDiskBase):
486 elif isinstance( obj, list):
489 retObj.append( UnProxyObjectSimpleLocal(elt) )
495 def __init__(self, fileName):
496 self._filename = fileName
499 return self._filename
501 class FileDeleter(FileHolder):
502 def __init__(self, fileName):
503 super().__init__( fileName )
506 if os.path.exists( self._filename ):
507 os.unlink( self._filename )
509 class MonitoringInfo:
510 def __init__(self, pyFileName, intervalInMs, outFileName, pid):
511 self._py_file_name = pyFileName
512 self._interval_in_ms = intervalInMs
513 self._out_file_name = outFileName
517 def pyFileName(self):
518 return self._py_file_name
525 def pid(self, value):
529 def outFileName(self):
530 return self._out_file_name
533 def intervalInMs(self):
534 return self._interval_in_ms
536 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
538 This method loops indefinitely every intervalInMs milliseconds to scan
539 number of inodes and size of content recursively included into the in input directory.
544 outFileName (str) : name of file inside the results will be written. If None a new file is generated
546 See also CPUMemoryMonitoring
550 dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
554 # outFileNameSave stores the content of outFileName during phase of dumping
555 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
556 outFileNameSave = f.name
557 with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
559 tempOutFile = outFileName
560 if tempOutFile is None:
561 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
562 with open(tempPyFile,"w") as f:
564 import subprocess as sp
569 with open("{tempOutFile}","a") as f:
570 f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
571 f.write( "{{}}\\n".format( "{intervalInMs}" ) )
575 nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]), ), shell = True).decode().strip()
580 st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
581 szOfDirStr = re.split("[\s]+",st)[0]
584 f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
585 f.write( "{{}}\\n".format( str( nbinodes ) ) )
586 f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
588 time.sleep( {intervalInMs} / 1000.0 )
589 """.format( **locals()))
590 logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
591 pyFileName = FileDeleter( tempPyFile )
592 if outFileName is None:
593 outFileName = FileDeleter( tempOutFile )
595 outFileName = FileHolder(outFileName)
596 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
598 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
600 Launch a subprocess monitoring self process.
601 This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
602 CPU usage and RSS memory of the calling process.
603 Communication between subprocess and self is done by file.
607 outFileName (str) : name of file inside the results will be written. If None a new file is generated
609 See also FileSystemMonitoring
612 def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
615 with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
617 tempOutFile = outFileName
618 if tempOutFile is None:
619 tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
621 with open(tempPyFile,"w") as f:
622 f.write("""import psutil
624 process = psutil.Process( pid )
625 def getChargeOf( p ):
626 a,b = p.cpu_percent(), p.memory_info().rss
628 for c in p.children():
629 a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
634 with open("{}","a") as f:
635 f.write( "{{}}\\n".format( "{}" ) )
637 cpu,mem_rss = getChargeOf( process )
638 f.write( "{{}}\\n".format( str( cpu ) ) )
639 f.write( "{{}}\\n".format( str( mem_rss ) ) )
641 time.sleep( {} / 1000.0 )
642 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
643 if outFileName is None:
644 autoOutFile = FileDeleter(tempOutFile)
646 autoOutFile = FileHolder(tempOutFile)
647 return FileDeleter(tempPyFile),autoOutFile
648 pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
649 return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
651 class GenericPythonMonitoringLauncherCtxMgr:
652 def __init__(self, monitoringParams):
656 monitoringParams (MonitoringInfo)
658 self._monitoring_params = monitoringParams
661 pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
662 self._monitoring_params.pid = pid
663 return self._monitoring_params
665 def __exit__(self,exctype, exc, tb):
666 StopMonitoring( self._monitoring_params )
667 del self._monitoring_params
669 gc.collect() # force destruction of objects even in raise context
671 def StopMonitoring( monitoringInfo ):
673 Kill monitoring subprocess.
677 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
680 KernelBasis.StopMonitoring(monitoringInfo.pid)
683 def __init__(self, intervalInMs, cpu, mem_rss):
688 cpu (list<float>) CPU usage
689 mem_rss (list<int>) rss memory usage
691 self._interval_in_ms = intervalInMs
692 self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
694 st = """Interval in ms : {self.intervalInMs}
696 """.format( **locals() )
699 def intervalInMs(self):
700 return self._interval_in_ms
704 list of triplets. First param of pair is cpu usage
705 Second param of pair is memory usage
709 def ReadCPUMemInfoInternal( fileName ):
711 cpu = [] ; mem_rss = []
712 if os.path.exists( fileName ):
714 with open(fileName, "r") as f:
715 coarseData = [ elt.strip() for elt in f.readlines() ]
716 intervalInMs = int( coarseData[0] )
717 coarseData = coarseData[1:]
718 cpu = [float(elt) for elt in coarseData[::2]]
719 mem_rss = [ int(elt) for elt in coarseData[1::2]]
722 return CPUMemInfo(intervalInMs,cpu,mem_rss)
724 def ReadCPUMemInfo( monitoringInfo ):
726 Retrieve CPU/Mem data of monitoring.
730 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
736 return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
739 def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
743 timeStamps (list<datetimestruct>)
745 volumeOfDir (list<str>)
747 self._dir_name_monitored = dirNameMonitored
748 self._interval_in_ms = intervalInMs
749 self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
751 st = """Filename monitored : {self.dirNameMonitored}
752 Interval in ms : ${self.intervalInMs}
754 """.format( **locals() )
757 def dirNameMonitored(self):
758 return self._dir_name_monitored
760 def intervalInMs(self):
761 return self._interval_in_ms
765 list of triplets. First param of triplet is datetimestruct
766 Second param of triplet is #inodes.
767 Thirst param of triplet is size.
771 def ReadInodeSizeInfoInternal( fileName ):
774 with open(fileName, "r") as f:
775 coarseData = [ elt.strip() for elt in f.readlines() ]
776 dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
777 tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
778 nbInodes = [int(elt) for elt in coarseData[1::3]]
779 volumeOfDir = coarseData[2::3]
780 return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
782 def ReadInodeSizeInfo( monitoringInfo ):
784 Retrieve nb of inodes and size of monitoring
788 monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
794 return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
796 class SeqByteReceiver:
797 # 2GB limit to trigger split into chunks
798 CHUNK_SIZE = 2000000000
799 def __init__(self,sender):
802 self._obj.UnRegister()
805 size = self._obj.getSize()
806 if size <= SeqByteReceiver.CHUNK_SIZE:
807 return self.fetchOneShot( size )
809 return self.fetchByChunks( size )
810 def fetchOneShot(self,size):
811 return self._obj.sendPart(0,size)
812 def fetchByChunks(self,size):
814 To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
816 data_for_split_case = bytes(0)
817 EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
818 iStart = 0 ; iEnd = EFF_CHUNK_SIZE
819 while iStart!=iEnd and iEnd <= size:
820 part = self._obj.sendPart(iStart,iEnd)
821 data_for_split_case = bytes(0).join( [data_for_split_case,part] )
822 iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
823 return data_for_split_case
825 FinalCode = """import pickle
826 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
829 orb = CORBA.ORB_init([''])
832 outputFileName = "{}"
834 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
835 with open(inputFileName,"rb") as f:
836 context = pickle.load( f )
837 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
838 with open(codeFileName,"r") as f:
841 exec( code , context )
842 # filter part of context to be exported to father process
843 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
845 with open(outputFileName,"wb") as f:
846 pickle.dump( context, f )
849 class PythonFunctionEvaluatorParams:
850 def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
851 self._main_filename = mainFileName
852 self._code_filename = codeFileName
853 self._in_context_filename = inContextFileName
854 self._out_context_filename = outContextFileName
858 with open(self._out_context_filename,"rb") as f:
859 return pickle.load( f )
860 def destroyOnOK(self):
861 for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
862 if os.path.exists( fileToDestroy ):
863 os.unlink( fileToDestroy )
864 def destroyOnKO(self, containerRef):
866 Called in the context of failure with replay mode activated
868 for fileToDestroy in [self._out_context_filename]:
869 if os.path.exists( fileToDestroy ):
870 os.unlink( fileToDestroy )
871 # register to container files group associated to the
872 containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
875 return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
878 def cleanOperations(self):
880 return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
882 def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
884 return f"return with non zero code ({returnCode})"
887 if keepFilesToReplay:
888 return f"""return with non zero code ({returnCode})
890 Looks like a hard crash as returnCode {returnCode} != 0
892 {self.cleanOperations}
896 return f"""return with non zero code ({returnCode})
898 Looks like a hard crash as returnCode {returnCode} != 0
902 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
904 Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
909 code (str) : python code to be executed using context
910 context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
911 outargsname (list<str>) : list of arguments to be exported
912 containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
913 instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
914 keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
915 closeEyesOnErrorAtExit (bool) : if True in case of crash of subprocess, if MY_KEY_TO_DETECT_FINISH is displayed at the end of stdout
920 ScriptExecInfo : instance serverside
925 context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
929 import subprocess as sp
932 def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
933 def StdErrTreatment(closeEyesOnErrorAtExit , stderr):
934 if not closeEyesOnErrorAtExit:
937 return stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
939 return True,StdErrTreatment(closeEyesOnErrorAtExit , stderr)
940 if not closeEyesOnErrorAtExit:
942 if stderr[-len(MY_KEY_TO_DETECT_FINISH):] == MY_KEY_TO_DETECT_FINISH:
943 return True,stderr[:-len(MY_KEY_TO_DETECT_FINISH)]
948 def InternalExecResistant( code, context, outargsname):
950 orb = CORBA.ORB_init([''])
951 iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
953 EXEC_CODE_FNAME_PXF = "execsafe_"
954 def RetrieveUniquePartFromPfx( fname ):
955 return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
956 with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
957 codeFd.write( "{}\n".format( containerRef.get_startup_code() ) )
959 if closeEyesOnErrorAtExit:
962 sys.stderr.write({!r})
963 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
965 codeFileName = os.path.basename( codeFd.name )
966 contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
967 with open(contextFileName,"wb") as contextFd:
968 pickle.dump( context, contextFd)
969 resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) )
970 mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
971 with open(mainExecFileName,"w") as f:
972 f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
973 for iTry in range( KernelBasis.GetNumberOfRetry() ):
975 print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
976 p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
977 stdout, stderr = p.communicate()
978 returnCode = p.returncode
981 return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
982 ret = instanceOfLogOfCurrentSession._current_instance
983 returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
984 stdout = stdout.decode()
985 stderr = stderr.decode()
986 sys.stdout.write( stdout ) ; sys.stdout.flush()
987 isOK, stderr = IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr )
988 sys.stderr.write( stderr ) ; sys.stderr.flush()
990 pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
991 if len(pcklData) > 0:
992 ret = pickle.loads( pcklData )
993 context.update( evParams.result )
994 evParams.destroyOnOK()
996 print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
999 if keepFilesToReplay:
1000 evParams.destroyOnKO( containerRef )
1002 evParams.destroyOnOK()
1003 raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
1005 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1006 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
1008 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1009 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
1011 def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1012 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
1014 def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1015 return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
1017 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
1018 exec( code, context )
1019 return instanceOfLogOfCurrentSession._current_instance
1021 class LogOfCurrentExecutionSessionAbs(abc.ABC):
1023 self._current_instance = ScriptExecInfo()
1025 def addInfoOnLevel2(self, key, value):
1026 setattr(self._current_instance,key,value)
1029 def addFreestyleAndFlush(self, value):
1030 raise RuntimeError("Must be overloaded")
1032 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
1033 def __init__(self, handleToCentralizedInst):
1035 self._remote_handle = handleToCentralizedInst
1037 def addFreestyleAndFlush(self, value):
1038 self._current_instance.freestyle = value
1039 self.finalizeAndPushToMaster()
1041 def finalizeAndPushToMaster(self):
1043 Voluntary do nothing in case of problem to avoid to trouble execution
1046 self._remote_handle.assign( pickle.dumps( self._current_instance ) )
1050 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
1052 This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
1054 def __init__(self, handleToCentralizedInst = None):
1056 def addFreestyleAndFlush(self, value):
1059 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
1060 """The implementation of the PyScriptNode CORBA IDL that executes a script"""
1061 def __init__(self, nodeName, code, poa, my_container, logscript):
1062 """Initialize the node : compilation in the local context"""
1063 Generic.__init__(self,poa)
1064 self.nodeName=nodeName
1066 self.my_container_py = my_container
1067 self.my_container=my_container._container
1068 linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
1069 self.ccode=compile(code,nodeName,'exec')
1071 self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
1072 self._log_script = logscript
1073 self._current_execution_session = None
1074 sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
1077 def executeNow(self, outargsname):
1078 raise RuntimeError("Must be overloaded")
1081 # force removal of self.context. Don t know why it s not done by default
1082 self.removeAllVarsInContext()
1085 def getContainer(self):
1086 return self.my_container
1092 return self.nodeName
1094 def defineNewCustomVar(self,varName,valueOfVar):
1095 self.context[varName] = pickle.loads(valueOfVar)
1098 def executeAnotherPieceOfCode(self,code):
1099 """Called for initialization of container lodging self."""
1101 ccode=compile(code,self.nodeName,'exec')
1102 exec(ccode, self.context)
1104 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
1106 def assignNewCompiledCode(self,codeStr):
1109 self.ccode=compile(codeStr,self.nodeName,'exec')
1111 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
1113 def executeSimple(self, key, val):
1115 Same as execute method except that no pickelization mecanism is implied here. No output is expected
1118 self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
1119 exec(self.ccode,self.context)
1121 exc_typ,exc_val,exc_fr=sys.exc_info()
1122 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1123 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1124 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
1126 def execute(self,outargsname,argsin):
1127 """Execute the script stored in attribute ccode with pickled args (argsin)"""
1129 argsname,kws=pickle.loads(argsin)
1130 self.context.update(kws)
1131 exec(self.ccode, self.context)
1133 for arg in outargsname:
1134 if arg not in self.context:
1135 raise KeyError("There is no variable %s in context" % arg)
1136 argsout.append(self.context[arg])
1137 argsout=pickle.dumps(tuple(argsout),-1)
1140 exc_typ,exc_val,exc_fr=sys.exc_info()
1141 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1142 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1143 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1145 def executeFirst(self,argsin):
1146 """ Same than first part of self.execute to reduce memory peak."""
1147 def ArgInMananger(self,argsin):
1148 argsInPy = SeqByteReceiver( argsin )
1149 data = argsInPy.data()
1150 self.addInfoOnLevel2("inputMem",len(data))
1151 _,kws=pickle.loads(data)
1154 self.beginOfCurrentExecutionSession()
1155 self.addTimeInfoOnLevel2("startInputTime")
1156 # to force call of SeqByteReceiver's destructor
1157 kws = ArgInMananger(self,argsin)
1158 vis = InOutputObjVisitor()
1160 # fetch real data if necessary
1161 kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1162 self.addInfoOnLevel2("inputHDDMem",vis)
1163 self.context.update(kws)
1164 self.addTimeInfoOnLevel2("endInputTime")
1166 exc_typ,exc_val,exc_fr=sys.exc_info()
1167 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1168 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1169 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1171 def executeSecond(self,outargsname):
1172 """ Same than second part of self.execute to reduce memory peak."""
1173 def executeSecondInternal(monitoringtimeresms):
1174 with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( monitoringtimeresms ) ) as monitoringParams:
1175 currentInstance = self.executeNow( outargsname )
1176 cpumeminfo = ReadCPUMemInfo( monitoringParams )
1177 return cpumeminfo, currentInstance
1181 self.addTimeInfoOnLevel2("startExecTime")
1183 self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1184 cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
1186 self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1187 self.addTimeInfoOnLevel2("endExecTime")
1188 self.addTimeInfoOnLevel2("startOutputTime")
1190 for arg in outargsname:
1191 if arg not in self.context:
1192 raise KeyError("There is no variable %s in context" % arg)
1193 argsout.append(self.context[arg])
1196 vis = InOutputObjVisitor()
1198 # the proxy mecanism is catched here
1199 argPickle = SpoolPickleObject( arg, vis )
1200 retArg = SenderByte_i( self.poa,argPickle )
1201 id_o = self.poa.activate_object(retArg)
1202 retObj = self.poa.id_to_reference(id_o)
1203 ret.append( retObj._narrow( SALOME.SenderByte ) )
1204 outputMem += len(argPickle)
1205 self.addInfoOnLevel2("outputMem",outputMem)
1206 self.addInfoOnLevel2("outputHDDMem",vis)
1207 self.addTimeInfoOnLevel2("endOutputTime")
1208 self.endOfCurrentExecutionSession()
1211 exc_typ,exc_val,exc_fr=sys.exc_info()
1212 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1213 print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1214 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1216 def listAllVarsInContext(self):
1218 pat = re.compile("^__([a-z]+)__$")
1219 return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1221 def removeAllVarsInContext(self):
1222 for elt in self.listAllVarsInContext():
1223 del self.context[elt]
1225 def getValueOfVarInContext(self,varName):
1227 return pickle.dumps(self.context[varName],-1)
1229 exc_typ,exc_val,exc_fr=sys.exc_info()
1230 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1231 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1234 def assignVarInContext(self, varName, value):
1236 self.context[varName][0] = pickle.loads(value)
1238 exc_typ,exc_val,exc_fr=sys.exc_info()
1239 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1240 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1243 def callMethodOnVarInContext(self, varName, methodName, args):
1245 return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1247 exc_typ,exc_val,exc_fr=sys.exc_info()
1248 l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1249 raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1252 def beginOfCurrentExecutionSession(self):
1253 self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1254 self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1256 def endOfCurrentExecutionSession(self):
1257 self._current_execution_session.finalizeAndPushToMaster()
1258 self._current_execution_session = None
1260 def addInfoOnLevel2(self, key, value):
1261 self._current_execution_session.addInfoOnLevel2(key, value)
1263 def addTimeInfoOnLevel2(self, key):
1264 from datetime import datetime
1265 self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1267 class PyScriptNode_i(PyScriptNode_Abstract_i):
1268 def __init__(self, nodeName, code, poa, my_container, logscript):
1269 super().__init__(nodeName, code, poa, my_container, logscript)
1271 def executeNow(self, outargsname):
1272 return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1274 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1275 def __init__(self, nodeName, code, poa, my_container, logscript):
1276 super().__init__(nodeName, code, poa, my_container, logscript)
1278 def executeNow(self, outargsname):
1279 return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1281 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1282 def __init__(self, nodeName, code, poa, my_container, logscript):
1283 super().__init__(nodeName, code, poa, my_container, logscript)
1285 def executeNow(self, outargsname):
1286 return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1288 class PyScriptNode_OutOfProcess_FT_i(PyScriptNode_Abstract_i):
1289 def __init__(self, nodeName, code, poa, my_container, logscript):
1290 super().__init__(nodeName, code, poa, my_container, logscript)
1292 def executeNow(self, outargsname):
1293 return ExecCrashProofWithoutReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1295 class PyScriptNode_OutOfProcess_Replay_FT_i(PyScriptNode_Abstract_i):
1296 def __init__(self, nodeName, code, poa, my_container, logscript):
1297 super().__init__(nodeName, code, poa, my_container, logscript)
1299 def executeNow(self, outargsname):
1300 return ExecCrashProofWithReplayFT(self.code,self.context,outargsname,self.my_container,self._current_execution_session)