Salome HOME
[EDF29138] : measure CPU/Mem even in OutOfProcess mode
[modules/kernel.git] / src / Container / SALOME_PyNode.py
1 #  -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024  CEA, EDF, OPEN CASCADE
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 #  File   : SALOME_PyNode.py
22 #  Author : Christian CAREMOLI, EDF
23 #  Module : SALOME
24 #  $Header$
25 #
26 import sys,traceback
27 import linecache
28 import pickle
29 import Engines__POA
30 import SALOME__POA
31 import SALOME
32 import logging
33 import abc
34 import os
35 import sys
36 from SALOME_ContainerHelper import ScriptExecInfo
37
38 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
39
40 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
41
42 class Generic(SALOME__POA.GenericObj):
43   """A Python implementation of the GenericObj CORBA IDL"""
44   def __init__(self,poa):
45     self.poa=poa
46     self.cnt=1
47
48   def Register(self):
49     #print("Register called : %d"%self.cnt)
50     self.cnt+=1
51
52   def UnRegister(self):
53     #print("UnRegister called : %d"%self.cnt)
54     self.cnt-=1
55     if self.cnt <= 0:
56       oid=self.poa.servant_to_id(self)
57       self.poa.deactivate_object(oid)
58
59   def Destroy(self):
60     print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
61     self.UnRegister()
62
63   def __del__(self):
64     #print("Destuctor called")
65     pass
66
67 class PyNode_i (Engines__POA.PyNode,Generic):
68   """The implementation of the PyNode CORBA IDL"""
69   def __init__(self, nodeName,code,poa,my_container):
70     """Initialize the node : compilation in the local context"""
71     Generic.__init__(self,poa)
72     self.nodeName=nodeName
73     self.code=code
74     self.my_container=my_container._container
75     linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
76     ccode=compile(code,nodeName,'exec')
77     self.context={}
78     self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
79     exec(ccode, self.context)
80
81   def getContainer(self):
82     return self.my_container
83
84   def getCode(self):
85     return self.code
86
87   def getName(self):
88     return self.nodeName
89
90   def defineNewCustomVar(self,varName,valueOfVar):
91     self.context[varName] = pickle.loads(valueOfVar)
92     pass
93
94   def executeAnotherPieceOfCode(self,code):
95     """Called for initialization of container lodging self."""
96     try:
97       ccode=compile(code,self.nodeName,'exec')
98       exec(ccode, self.context)
99     except Exception:
100       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
101
102   def execute(self,funcName,argsin):
103     """Execute the function funcName found in local context with pickled args (argsin)"""
104     try:
105       argsin,kws=pickle.loads(argsin)
106       func=self.context[funcName]
107       argsout=func(*argsin,**kws)
108       argsout=pickle.dumps(argsout,-1)
109       return argsout
110     except Exception:
111       exc_typ,exc_val,exc_fr=sys.exc_info()
112       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
113       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
114
115 class SenderByte_i(SALOME__POA.SenderByte,Generic):
116   def __init__(self,poa,bytesToSend):
117     Generic.__init__(self,poa)
118     self.bytesToSend = bytesToSend
119
120   def getSize(self):
121     return len(self.bytesToSend)
122
123   def sendPart(self,n1,n2):
124     return self.bytesToSend[n1:n2]
125
126 SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR"
127     
128 SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES"
129
130 # default is 50 MB
131 SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000
132
133 DicoForProxyFile = { }
134
135 def GetSizeOfBufferedReader(f):
136   """
137   This method returns in bytes size of a file openned.
138
139   Args:
140   ----
141       f (io.IOBase): buffered reader returned by open
142       
143   Returns
144   -------
145       int: number of bytes
146   """
147   import io
148   pos = f.tell()
149   f.seek(0,io.SEEK_END)
150   pos2 = f.tell()
151   f.seek(pos,io.SEEK_SET)
152   return pos2-pos
153
154 def GetObjectFromFile(fname, visitor = None):
155   with open(fname,"rb") as f:
156     if visitor:
157       visitor.setHDDMem( GetSizeOfBufferedReader(f) )
158       visitor.setFileName( fname )
159     obj = pickle.load(f)
160   return obj
161
162 def DumpInFile(obj,fname):
163   with open(fname,"wb") as f:
164     f.write( obj )
165
166 def IncrRefInFile(fname):
167   if fname in DicoForProxyFile:
168     DicoForProxyFile[fname] += 1
169   else:
170     DicoForProxyFile[fname] = 2
171   pass
172
173 def DecrRefInFile(fname):
174   if fname not in DicoForProxyFile:
175     cnt = 1
176   else:
177     cnt = DicoForProxyFile[fname]
178     DicoForProxyFile[fname] -= 1
179     if cnt == 1:
180       del DicoForProxyFile[fname]
181   if cnt == 1:
182     if os.path.exists(fname):
183       os.unlink( fname )
184   pass
185
186 def GetBigObjectOnDiskThreshold():
187   import os
188   if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ:
189     return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] )
190   else:
191     return SALOME_BIG_OBJ_ON_DISK_THRES_DFT
192
193 def ActivateProxyMecanismOrNot( sizeInByte ):
194   thres = GetBigObjectOnDiskThreshold()
195   if thres == -1:
196     return False
197   else:
198     return sizeInByte > thres
199
200 def GetBigObjectDirectory():
201   import os
202   if SALOME_FILE_BIG_OBJ_DIR not in os.environ:
203     raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
204   return os.path.expanduser( os.path.expandvars( os.environ[SALOME_FILE_BIG_OBJ_DIR] ) )
205
206 def GetBigObjectFileName():
207   """
208   Return a filename in the most secure manner (see tempfile documentation)
209   """
210   import tempfile
211   with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
212     ret = f.name
213   return ret
214
215 class BigObjectOnDiskBase:
216   def __init__(self, fileName, objSerialized):
217     """
218     :param fileName: the file used to dump into.
219     :param objSerialized: the object in pickeled form
220     :type objSerialized: bytes
221     """
222     self._filename = fileName
223     # attribute _destroy is here to tell client side or server side
224     # only client side can be with _destroy set to True. server side due to risk of concurrency
225     # so pickled form of self must be done with this attribute set to False.
226     self._destroy = False
227     self.__dumpIntoFile(objSerialized)
228
229   def getDestroyStatus(self):
230     return self._destroy
231
232   def incrRef(self):
233     if self._destroy:
234       IncrRefInFile( self._filename )
235     else:
236       # should never happen !
237       RuntimeError("Invalid call to incrRef !")
238
239   def decrRef(self):
240     if self._destroy:
241       DecrRefInFile( self._filename )
242     else:
243       # should never happen !
244       RuntimeError("Invalid call to decrRef !")
245
246   def unlinkOnDestructor(self):
247     self._destroy = True
248
249   def doNotTouchFile(self):
250     """
251     Method called slave side. The life cycle management of file is client side not slave side.
252     """
253     self._destroy = False
254
255   def __del__(self):
256     if self._destroy:
257       DecrRefInFile( self._filename )
258
259   def getFileName(self):
260     return self._filename
261   
262   def __dumpIntoFile(self, objSerialized):
263     DumpInFile( objSerialized, self._filename )
264
265   def get(self, visitor = None):
266     obj = GetObjectFromFile( self._filename, visitor )
267     return obj
268
269   def __float__(self):
270     return float( self.get() )
271     
272   def __int__(self):
273     return int( self.get() )
274     
275   def __str__(self):
276     obj = self.get()
277     if isinstance(obj,str):
278         return obj
279     else:
280         raise RuntimeError("Not a string")
281       
282 class BigObjectOnDisk(BigObjectOnDiskBase):
283   def __init__(self, fileName, objSerialized):
284     BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
285     
286 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
287   def __init__(self, pos, length, fileName):
288     self._filename = fileName
289     self._destroy = False
290     self._pos = pos
291     self._length = length
292
293   def get(self, visitor = None):
294     fullObj = BigObjectOnDiskBase.get(self, visitor)
295     return fullObj[ self._pos ]
296     
297   def __getitem__(self, i):
298     return self.get()[i]
299
300   def __len__(self):
301     return len(self.get())
302     
303 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
304   def __init__(self, length, fileName, objSerialized):
305     BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
306     self._length = length
307
308   def __getitem__(self, i):
309     return BigObjectOnDiskListElement(i, self._length, self.getFileName())
310
311   def __len__(self):
312     return self._length
313
314 class BigObjectOnDiskList(BigObjectOnDiskSequence):
315   def __init__(self, length, fileName, objSerialized):
316     BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
317     
318 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
319   def __init__(self, length, fileName, objSerialized):
320     BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
321
322 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
323   """
324   This method return a proxy instance of pickled form of object given in input.
325
326   Args:
327   ----
328       obj (pickelable type) : object to be proxified
329       pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
330
331   Returns
332   -------
333       BigObjectOnDiskBase: proxy instance
334   """
335   pickleObj = pickleObjInit
336   if pickleObj is None:
337     pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
338   fileName = GetBigObjectFileName()
339   if visitor:
340     visitor.setHDDMem( len(pickleObj) )
341     visitor.setFileName(fileName)
342   if isinstance( obj, list):
343     proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
344   elif isinstance( obj, tuple):
345     proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
346   else:
347     proxyObj = BigObjectOnDisk( fileName , pickleObj )
348   return proxyObj
349
350 def SpoolPickleObject( obj, visitor = None ):
351   import pickle
352   with InOutputObjVisitorCM(visitor) as v:
353     pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
354     if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
355       return pickleObjInit
356     else:
357       proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
358       pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
359       return pickleProxy
360
361 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
362
363 def UnProxyObjectSimple( obj, visitor = None ):
364   """
365   Method to be called in Remote mode. Alterate the obj _status attribute. 
366   Because the slave process does not participate in the reference counting
367   
368   Args:
369   ----
370       visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
371
372   """
373   with InOutputObjVisitorCM(visitor) as v:
374     logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
375     if isinstance(obj,BigObjectOnDiskBase):
376       obj.doNotTouchFile()
377       return obj.get( v )
378     elif isinstance( obj, list):
379       retObj = []
380       for elt in obj:
381         retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
382       return retObj
383     else:
384       return obj
385
386 def UnProxyObjectSimpleLocal( obj ):
387   """
388   Method to be called in Local mode. Do not alterate the PyObj counter
389   """
390   if isinstance(obj,BigObjectOnDiskBase):
391     return obj.get()
392   elif isinstance( obj, list):
393     retObj = []
394     for elt in obj:
395       retObj.append( UnProxyObjectSimpleLocal(elt) )
396     return retObj
397   else:
398     return obj
399   
400 class FileHolder:
401   def __init__(self, fileName):
402     self._filename = fileName
403   @property
404   def filename(self):
405     return self._filename
406   
407 class FileDeleter(FileHolder):
408   def __init__(self, fileName):
409     super().__init__( fileName )
410   def __del__(self):
411     import os
412     if os.path.exists( self._filename ):
413       os.unlink( self._filename )
414
415 class MonitoringInfo:
416   def __init__(self, pyFileName, intervalInMs, outFileName, pid):
417     self._py_file_name = pyFileName
418     self._interval_in_ms = intervalInMs
419     self._out_file_name = outFileName
420     self._pid = pid
421
422   @property
423   def pyFileName(self):
424     return self._py_file_name
425
426   @property
427   def pid(self):
428     return self._pid
429   
430   @pid.setter
431   def pid(self, value):
432     self._pid = value
433
434   @property
435   def outFileName(self):
436     return self._out_file_name
437   
438   @property
439   def intervalInMs(self):
440     return self._interval_in_ms
441   
442 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
443     """
444     This method loops indefinitely every intervalInMs milliseconds to scan 
445     number of inodes and size of content recursively included into the in input directory.
446
447     Args:
448     ----
449
450     outFileName (str) : name of file inside the results will be written. If None a new file is generated
451
452     See also CPUMemoryMonitoring
453     """
454     global orb
455     import os
456     dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
457     import tempfile
458     import logging
459     import KernelBasis
460     # outFileNameSave stores the content of outFileName during phase of dumping
461     with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
462       outFileNameSave = f.name
463     with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
464       tempPyFile = f.name
465     tempOutFile = outFileName
466     if tempOutFile is None:
467       tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
468     with open(tempPyFile,"w") as f:
469         f.write("""
470 import subprocess as sp
471 import re
472 import os
473 import time
474 import datetime
475 with open("{tempOutFile}","a") as f:
476   f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
477   f.write( "{{}}\\n".format( "{intervalInMs}" ) )
478   while(True):
479     nbinodes = -1
480     try:
481       nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]),  ), shell = True).decode().strip()
482     except:
483       pass
484     szOfDirStr = "fail"
485     try:
486       st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
487       szOfDirStr = re.split("[\s]+",st)[0]
488     except:
489       pass
490     f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
491     f.write( "{{}}\\n".format( str( nbinodes  ) ) )
492     f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
493     f.flush()
494     time.sleep( {intervalInMs} / 1000.0 )
495 """.format( **locals()))
496     logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
497     pyFileName = FileDeleter( tempPyFile )
498     if outFileName is None:
499       outFileName = FileDeleter( tempOutFile )
500     else:
501       outFileName = FileHolder(outFileName)
502     return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
503
504 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
505   """
506   Launch a subprocess monitoring self process.
507   This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
508   CPU usage and RSS memory of the calling process.
509   Communication between subprocess and self is done by file.
510
511   Args:
512   ----
513     outFileName (str) : name of file inside the results will be written. If None a new file is generated
514
515   See also FileSystemMonitoring
516   """
517   import KernelBasis
518   def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
519     import os
520     import tempfile
521     with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
522       tempPyFile = f.name
523     tempOutFile = outFileName
524     if tempOutFile is None:
525       tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
526     pid = os.getpid()
527     with open(tempPyFile,"w") as f:
528       f.write("""import psutil
529 pid = {}
530 process = psutil.Process( pid )
531 def getChargeOf( p ):
532   a,b = p.cpu_percent(), p.memory_info().rss
533   try:
534     for c in p.children():
535       a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
536   except:
537     pass
538   return a,b
539 import time
540 with open("{}","a") as f:
541   f.write( "{{}}\\n".format( "{}" ) )
542   while True:
543     cpu,mem_rss = getChargeOf( process )
544     f.write( "{{}}\\n".format( str( cpu ) ) )
545     f.write( "{{}}\\n".format( str( mem_rss  ) ) )
546     f.flush()
547     time.sleep( {} / 1000.0 )
548 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
549     if outFileName is None:
550       autoOutFile = FileDeleter(tempOutFile)
551     else:
552       autoOutFile = FileHolder(tempOutFile)
553     return FileDeleter(tempPyFile),autoOutFile
554   pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
555   return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
556
557 class GenericPythonMonitoringLauncherCtxMgr:
558     def __init__(self, monitoringParams):
559         """
560         Args:
561         ----
562             monitoringParams (MonitoringInfo)
563         """
564         self._monitoring_params = monitoringParams
565     def __enter__(self):
566         import KernelBasis
567         pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
568         self._monitoring_params.pid = pid
569         return self._monitoring_params
570     
571     def __exit__(self,exctype, exc, tb):
572         StopMonitoring( self._monitoring_params )
573
574 def StopMonitoring( monitoringInfo ):
575   """
576   Kill monitoring subprocess.
577
578   Args:
579   ----
580       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
581   """
582   import KernelBasis
583   KernelBasis.StopMonitoring(monitoringInfo.pid)
584
585 class CPUMemInfo:
586   def __init__(self, intervalInMs, cpu, mem_rss):
587     """
588     Args:
589     ----
590     intervalInMs (int)
591     cpu (list<float>)  CPU usage
592     mem_rss (list<int>) rss memory usage
593     """
594     self._interval_in_ms = intervalInMs
595     self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
596   def __str__(self):
597     st = """Interval in ms : {self.intervalInMs}
598 Data : ${self.data}
599 """.format( **locals() )
600     return st
601   @property
602   def intervalInMs(self):
603     return self._interval_in_ms
604   @property
605   def data(self):
606     """
607     list of triplets. First param of pair is cpu usage 
608                       Second param of pair is memory usage
609     """
610     return self._data
611
612 def ReadCPUMemInfoInternal( fileName ):
613   intervalInMs = 0
614   cpu = [] ; mem_rss = []
615   if os.path.exists( fileName ):
616     try:
617       with open(fileName, "r") as f:
618         coarseData = [ elt.strip() for elt in f.readlines() ]
619       intervalInMs = int( coarseData[0] )
620       coarseData = coarseData[1:]
621       cpu = [float(elt) for elt in coarseData[::2]]
622       mem_rss = [ int(elt) for elt in coarseData[1::2]]
623     except:
624       pass
625   return CPUMemInfo(intervalInMs,cpu,mem_rss)
626
627 def ReadCPUMemInfo( monitoringInfo ):
628   """
629   Retrieve CPU/Mem data of monitoring.
630
631   Args:
632   ----
633       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
634   
635   Returns
636   -------
637     CPUMemInfo instance
638   """
639   return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
640
641 class InodeSizeInfo:
642   def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
643     """
644     Args:
645     ----
646     timeStamps (list<datetimestruct>)
647     nbInodes (list<int>)
648     volumeOfDir (list<str>)
649     """
650     self._dir_name_monitored = dirNameMonitored
651     self._interval_in_ms = intervalInMs
652     self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
653   def __str__(self):
654     st = """Filename monitored : {self.dirNameMonitored}
655 Interval in ms : ${self.intervalInMs}
656 Data : ${self.data}
657 """.format( **locals() )
658     return st
659   @property
660   def dirNameMonitored(self):
661     return self._dir_name_monitored
662   @property
663   def intervalInMs(self):
664     return self._interval_in_ms
665   @property
666   def data(self):
667     """
668     list of triplets. First param of triplet is datetimestruct
669                                       Second param of triplet is #inodes.
670                                       Thirst param of triplet is size.
671     """
672     return self._data
673
674 def ReadInodeSizeInfoInternal( fileName ):
675   import datetime
676   import os
677   with open(fileName, "r") as f:
678     coarseData = [ elt.strip() for elt in f.readlines() ]
679   dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
680   tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
681   nbInodes = [int(elt) for elt in coarseData[1::3]]
682   volumeOfDir = coarseData[2::3]
683   return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
684
685 def ReadInodeSizeInfo( monitoringInfo ):
686   """
687   Retrieve nb of inodes and size of monitoring
688
689   Args:
690   ----
691       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
692
693   Returns
694   -------
695     InodeSizeInfo
696   """
697   return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
698
699 class SeqByteReceiver:
700   # 2GB limit to trigger split into chunks
701   CHUNK_SIZE = 2000000000
702   def __init__(self,sender):
703     self._obj = sender
704   def __del__(self):
705     self._obj.UnRegister()
706     pass
707   def data(self):
708     size = self._obj.getSize()
709     if size <= SeqByteReceiver.CHUNK_SIZE:
710       return self.fetchOneShot( size )
711     else:
712       return self.fetchByChunks( size )
713   def fetchOneShot(self,size):
714     return self._obj.sendPart(0,size)
715   def fetchByChunks(self,size):
716       """
717       To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
718       """
719       data_for_split_case = bytes(0)
720       EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
721       iStart = 0 ; iEnd = EFF_CHUNK_SIZE
722       while iStart!=iEnd and iEnd <= size:
723         part = self._obj.sendPart(iStart,iEnd)
724         data_for_split_case = bytes(0).join( [data_for_split_case,part] )
725         iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
726       return data_for_split_case
727   
728 FinalCode = """import pickle
729 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
730 import CORBA
731 import Engines
732 orb = CORBA.ORB_init([''])
733 codeFileName = "{}"
734 inputFileName = "{}"
735 outputFileName = "{}"
736 outputsKeys = {}
737 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
738 with open(inputFileName,"rb") as f:
739   context = pickle.load( f )
740 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
741 with open(codeFileName,"r") as f:
742   code = f.read()
743 # go for execution
744 exec( code , context )
745 # filter part of context to be exported to father process
746 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
747 #
748 with open(outputFileName,"wb") as f:
749   pickle.dump( context, f )
750 """
751
752 class PythonFunctionEvaluatorParams:
753   def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
754     self._main_filename = mainFileName
755     self._code_filename = codeFileName
756     self._in_context_filename = inContextFileName
757     self._out_context_filename = outContextFileName
758   @property
759   def result(self):
760     import pickle
761     with open(self._out_context_filename,"rb") as f:
762       return pickle.load( f )
763   def destroyOnOK(self):
764     for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
765       if os.path.exists( fileToDestroy ):
766         os.unlink( fileToDestroy )
767   def destroyOnKO(self, containerRef):
768      """
769      Called in the context of failure with replay mode activated
770      """
771      for fileToDestroy in [self._out_context_filename]:
772       if os.path.exists( fileToDestroy ):
773         os.unlink( fileToDestroy )
774       # register to container files group associated to the
775       containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
776   @property
777   def replayCmd(self):
778     return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
779   
780   @property
781   def cleanOperations(self):
782     import os
783     return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
784
785   def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
786     if returnCode == -1:
787       return f"return with non zero code ({returnCode})"
788     else:
789       banner = 200*"*"
790       if keepFilesToReplay:
791         return f"""return with non zero code ({returnCode})
792 {banner}
793 Looks like a hard crash as returnCode {returnCode} != 0
794 {self.replayCmd}
795 {self.cleanOperations}
796 {banner}
797 """
798       else:
799         return f"""return with non zero code ({returnCode})
800 {banner}
801 Looks like a hard crash as returnCode {returnCode} != 0
802 {banner}
803 """
804
805 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
806   """
807   Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
808   
809   Args:
810   -----
811
812   code (str) : python code to be executed using context
813   context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
814   outargsname (list<str>) : list of arguments to be exported 
815   containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
816   instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
817   keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
818
819   Return:
820   -------
821
822   ScriptExecInfo : instance serverside
823
824   In/Out:
825   -------
826
827   context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
828   """
829   import tempfile
830   import pickle
831   import subprocess as sp
832   import CORBA
833   #
834   def InternalExecResistant( code, context, outargsname):
835     orb = CORBA.ORB_init([''])
836     iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
837     ####
838     EXEC_CODE_FNAME_PXF = "execsafe_"
839     def RetrieveUniquePartFromPfx( fname ):
840       return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
841     with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
842       codeFd.write( code )
843       codeFd.flush()
844       codeFileName = os.path.basename( codeFd.name )
845       contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
846       with open(contextFileName,"wb") as contextFd:
847         pickle.dump( context, contextFd)
848       resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
849       mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
850       with open(mainExecFileName,"w") as f:
851         f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
852       p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
853       stdout, stderr = p.communicate()
854       returnCode = p.returncode
855     return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
856   ret = instanceOfLogOfCurrentSession._current_instance
857   returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
858   stdout = stdout.decode()
859   stderr = stderr.decode()
860   sys.stdout.write( stdout ) ; sys.stdout.flush()
861   sys.stderr.write( stderr ) ; sys.stderr.flush()
862   if returnCode == 0:
863     pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
864     if len(pcklData) > 0:
865       ret = pickle.loads( pcklData )
866     context.update( evParams.result )
867     evParams.destroyOnOK()
868     return ret
869   if returnCode != 0:
870     if keepFilesToReplay:
871       evParams.destroyOnKO( containerRef )
872     else:
873       evParams.destroyOnOK()
874     raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
875
876 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
877   return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
878
879 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
880   return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
881
882 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
883   exec( code, context )
884   return instanceOfLogOfCurrentSession._current_instance
885
886 class LogOfCurrentExecutionSessionAbs(abc.ABC):
887   def __init__(self):
888     self._current_instance = ScriptExecInfo()
889
890   def addInfoOnLevel2(self, key, value):
891     setattr(self._current_instance,key,value)
892
893   @abc.abstractmethod
894   def addFreestyleAndFlush(self, value):
895     raise RuntimeError("Must be overloaded")
896
897 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
898   def __init__(self, handleToCentralizedInst):
899     super().__init__()
900     self._remote_handle = handleToCentralizedInst
901
902   def addFreestyleAndFlush(self, value):
903     self._current_instance.freestyle = value
904     self.finalizeAndPushToMaster()
905
906   def finalizeAndPushToMaster(self):
907     self._remote_handle.assign( pickle.dumps( self._current_instance ) )
908
909 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
910   """
911   This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
912   """
913   def __init__(self, handleToCentralizedInst = None):
914     super().__init__()
915   def addFreestyleAndFlush(self, value):
916     pass
917
918 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
919   """The implementation of the PyScriptNode CORBA IDL that executes a script"""
920   def __init__(self, nodeName, code, poa, my_container, logscript):
921     """Initialize the node : compilation in the local context"""
922     Generic.__init__(self,poa)
923     self.nodeName=nodeName
924     self.code=code
925     self.my_container_py = my_container
926     self.my_container=my_container._container
927     linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
928     self.ccode=compile(code,nodeName,'exec')
929     self.context={}
930     self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
931     self._log_script = logscript
932     self._current_execution_session = None
933     sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
934
935   @abc.abstractmethod
936   def executeNow(self, outargsname):
937     raise RuntimeError("Must be overloaded")
938       
939   def __del__(self):
940     # force removal of self.context. Don t know why it s not done by default
941     self.removeAllVarsInContext()
942     pass
943
944   def getContainer(self):
945     return self.my_container
946
947   def getCode(self):
948     return self.code
949
950   def getName(self):
951     return self.nodeName
952
953   def defineNewCustomVar(self,varName,valueOfVar):
954     self.context[varName] = pickle.loads(valueOfVar)
955     pass
956
957   def executeAnotherPieceOfCode(self,code):
958     """Called for initialization of container lodging self."""
959     try:
960       ccode=compile(code,self.nodeName,'exec')
961       exec(ccode, self.context)
962     except Exception:
963       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
964
965   def assignNewCompiledCode(self,codeStr):
966     try:
967       self.code=codeStr
968       self.ccode=compile(codeStr,self.nodeName,'exec')
969     except Exception:
970       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
971
972   def executeSimple(self, key, val):
973     """
974     Same as execute method except that no pickelization mecanism is implied here. No output is expected
975     """
976     try:
977       self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
978       exec(self.ccode,self.context)
979     except Exception:
980       exc_typ,exc_val,exc_fr=sys.exc_info()
981       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
982       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
983       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
984     
985   def execute(self,outargsname,argsin):
986     """Execute the script stored in attribute ccode with pickled args (argsin)"""
987     try:
988       argsname,kws=pickle.loads(argsin)
989       self.context.update(kws)
990       exec(self.ccode, self.context)
991       argsout=[]
992       for arg in outargsname:
993         if arg not in self.context:
994           raise KeyError("There is no variable %s in context" % arg)
995         argsout.append(self.context[arg])
996       argsout=pickle.dumps(tuple(argsout),-1)
997       return argsout
998     except Exception:
999       exc_typ,exc_val,exc_fr=sys.exc_info()
1000       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1001       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1002       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
1003
1004   def executeFirst(self,argsin):
1005     """ Same than first part of self.execute to reduce memory peak."""
1006     def ArgInMananger(self,argsin):
1007       argsInPy = SeqByteReceiver( argsin )
1008       data = argsInPy.data()
1009       self.addInfoOnLevel2("inputMem",len(data))
1010       _,kws=pickle.loads(data)
1011       return kws
1012     try:
1013       self.beginOfCurrentExecutionSession()
1014       self.addTimeInfoOnLevel2("startInputTime")
1015       # to force call of SeqByteReceiver's destructor
1016       kws = ArgInMananger(self,argsin)
1017       vis = InOutputObjVisitor()
1018       for elt in kws:
1019         # fetch real data if necessary
1020         kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1021       self.addInfoOnLevel2("inputHDDMem",vis)
1022       self.context.update(kws)
1023       self.addTimeInfoOnLevel2("endInputTime")
1024     except Exception:
1025       exc_typ,exc_val,exc_fr=sys.exc_info()
1026       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1027       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1028       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1029
1030   def executeSecond(self,outargsname):
1031     """ Same than second part of self.execute to reduce memory peak."""
1032     import sys
1033     try:
1034       self.addTimeInfoOnLevel2("startExecTime")
1035       ##
1036       self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1037       with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
1038         self._current_execution_session._current_instance = self.executeNow( outargsname )
1039         cpumeminfo = ReadCPUMemInfo( monitoringParams )
1040       ##
1041       self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1042       del monitoringParams
1043       self.addTimeInfoOnLevel2("endExecTime")
1044       self.addTimeInfoOnLevel2("startOutputTime")
1045       argsout=[]
1046       for arg in outargsname:
1047         if arg not in self.context:
1048           raise KeyError("There is no variable %s in context" % arg)
1049         argsout.append(self.context[arg])
1050       ret = [ ]
1051       outputMem = 0
1052       vis = InOutputObjVisitor()
1053       for arg in argsout:
1054         # the proxy mecanism is catched here
1055         argPickle = SpoolPickleObject( arg, vis )
1056         retArg = SenderByte_i( self.poa,argPickle )
1057         id_o = self.poa.activate_object(retArg)
1058         retObj = self.poa.id_to_reference(id_o)
1059         ret.append( retObj._narrow( SALOME.SenderByte ) )
1060         outputMem += len(argPickle)
1061       self.addInfoOnLevel2("outputMem",outputMem)
1062       self.addInfoOnLevel2("outputHDDMem",vis)
1063       self.addTimeInfoOnLevel2("endOutputTime")
1064       self.endOfCurrentExecutionSession()
1065       return ret
1066     except Exception:
1067       exc_typ,exc_val,exc_fr=sys.exc_info()
1068       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1069       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1070       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1071
1072   def listAllVarsInContext(self):
1073       import re
1074       pat = re.compile("^__([a-z]+)__$")
1075       return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1076       
1077   def removeAllVarsInContext(self):
1078       for elt in self.listAllVarsInContext():
1079         del self.context[elt]
1080
1081   def getValueOfVarInContext(self,varName):
1082     try:
1083       return pickle.dumps(self.context[varName],-1)
1084     except Exception:
1085       exc_typ,exc_val,exc_fr=sys.exc_info()
1086       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1087       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1088     pass
1089   
1090   def assignVarInContext(self, varName, value):
1091     try:
1092       self.context[varName][0] = pickle.loads(value)
1093     except Exception:
1094       exc_typ,exc_val,exc_fr=sys.exc_info()
1095       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1096       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1097     pass
1098
1099   def callMethodOnVarInContext(self, varName, methodName, args):
1100     try:
1101       return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1102     except Exception:
1103       exc_typ,exc_val,exc_fr=sys.exc_info()
1104       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1105       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1106     pass
1107
1108   def beginOfCurrentExecutionSession(self):
1109     self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1110     self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1111   
1112   def endOfCurrentExecutionSession(self):
1113     self._current_execution_session.finalizeAndPushToMaster()
1114     self._current_execution_session = None
1115
1116   def addInfoOnLevel2(self, key, value):
1117     self._current_execution_session.addInfoOnLevel2(key, value)
1118       
1119   def addTimeInfoOnLevel2(self, key):
1120     from datetime import datetime
1121     self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1122
1123 class PyScriptNode_i(PyScriptNode_Abstract_i):
1124   def __init__(self, nodeName, code, poa, my_container, logscript):
1125     super().__init__(nodeName, code, poa, my_container, logscript)
1126
1127   def executeNow(self, outargsname):
1128     return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1129     
1130 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1131   def __init__(self, nodeName, code, poa, my_container, logscript):
1132     super().__init__(nodeName, code, poa, my_container, logscript)
1133
1134   def executeNow(self, outargsname):
1135     return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1136
1137 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1138   def __init__(self, nodeName, code, poa, my_container, logscript):
1139     super().__init__(nodeName, code, poa, my_container, logscript)
1140
1141   def executeNow(self, outargsname):
1142     return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)