Salome HOME
[EDF29852] : Mecanism of fault tolerant in SALOME_Container to resist against emitted...
[modules/kernel.git] / src / Container / SALOME_PyNode.py
1 #  -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024  CEA, EDF, OPEN CASCADE
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 #  File   : SALOME_PyNode.py
22 #  Author : Christian CAREMOLI, EDF
23 #  Module : SALOME
24 #  $Header$
25 #
26 import sys,traceback
27 import linecache
28 import pickle
29 import Engines__POA
30 import SALOME__POA
31 import SALOME
32 import logging
33 import abc
34 import os
35 import sys
36 from SALOME_ContainerHelper import ScriptExecInfo
37
38 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
39
40 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
41
42 class Generic(SALOME__POA.GenericObj):
43   """A Python implementation of the GenericObj CORBA IDL"""
44   def __init__(self,poa):
45     self.poa=poa
46     self.cnt=1
47
48   def Register(self):
49     #print("Register called : %d"%self.cnt)
50     self.cnt+=1
51
52   def UnRegister(self):
53     #print("UnRegister called : %d"%self.cnt)
54     self.cnt-=1
55     if self.cnt <= 0:
56       oid=self.poa.servant_to_id(self)
57       self.poa.deactivate_object(oid)
58
59   def Destroy(self):
60     print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
61     self.UnRegister()
62
63   def __del__(self):
64     #print("Destuctor called")
65     pass
66
67 class PyNode_i (Engines__POA.PyNode,Generic):
68   """The implementation of the PyNode CORBA IDL"""
69   def __init__(self, nodeName,code,poa,my_container):
70     """Initialize the node : compilation in the local context"""
71     Generic.__init__(self,poa)
72     self.nodeName=nodeName
73     self.code=code
74     self.my_container=my_container._container
75     linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
76     ccode=compile(code,nodeName,'exec')
77     self.context={}
78     self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
79     exec(ccode, self.context)
80
81   def getContainer(self):
82     return self.my_container
83
84   def getCode(self):
85     return self.code
86
87   def getName(self):
88     return self.nodeName
89
90   def defineNewCustomVar(self,varName,valueOfVar):
91     self.context[varName] = pickle.loads(valueOfVar)
92     pass
93
94   def executeAnotherPieceOfCode(self,code):
95     """Called for initialization of container lodging self."""
96     try:
97       ccode=compile(code,self.nodeName,'exec')
98       exec(ccode, self.context)
99     except Exception:
100       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
101
102   def execute(self,funcName,argsin):
103     """Execute the function funcName found in local context with pickled args (argsin)"""
104     try:
105       argsin,kws=pickle.loads(argsin)
106       func=self.context[funcName]
107       argsout=func(*argsin,**kws)
108       argsout=pickle.dumps(argsout,-1)
109       return argsout
110     except Exception:
111       exc_typ,exc_val,exc_fr=sys.exc_info()
112       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
113       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
114
115 class SenderByte_i(SALOME__POA.SenderByte,Generic):
116   def __init__(self,poa,bytesToSend):
117     Generic.__init__(self,poa)
118     self.bytesToSend = bytesToSend
119
120   def getSize(self):
121     return len(self.bytesToSend)
122
123   def sendPart(self,n1,n2):
124     return self.bytesToSend[n1:n2]
125
126 SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR"
127     
128 SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES"
129
130 # default is 50 MB
131 SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000
132
133 DicoForProxyFile = { }
134
135 def GetSizeOfBufferedReader(f):
136   """
137   This method returns in bytes size of a file openned.
138
139   Args:
140   ----
141       f (io.IOBase): buffered reader returned by open
142       
143   Returns
144   -------
145       int: number of bytes
146   """
147   import io
148   pos = f.tell()
149   f.seek(0,io.SEEK_END)
150   pos2 = f.tell()
151   f.seek(pos,io.SEEK_SET)
152   return pos2-pos
153
154 def GetObjectFromFile(fname, visitor = None):
155   with open(fname,"rb") as f:
156     if visitor:
157       visitor.setHDDMem( GetSizeOfBufferedReader(f) )
158       visitor.setFileName( fname )
159     obj = pickle.load(f)
160   return obj
161
162 def DumpInFile(obj,fname):
163   with open(fname,"wb") as f:
164     f.write( obj )
165
166 def IncrRefInFile(fname):
167   if fname in DicoForProxyFile:
168     DicoForProxyFile[fname] += 1
169   else:
170     DicoForProxyFile[fname] = 2
171   pass
172
173 def DecrRefInFile(fname):
174   if fname not in DicoForProxyFile:
175     cnt = 1
176   else:
177     cnt = DicoForProxyFile[fname]
178     DicoForProxyFile[fname] -= 1
179     if cnt == 1:
180       del DicoForProxyFile[fname]
181   if cnt == 1:
182     if os.path.exists(fname):
183       os.unlink( fname )
184   pass
185
186 def GetBigObjectOnDiskThreshold():
187   import os
188   if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ:
189     return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] )
190   else:
191     return SALOME_BIG_OBJ_ON_DISK_THRES_DFT
192
193 def ActivateProxyMecanismOrNot( sizeInByte ):
194   thres = GetBigObjectOnDiskThreshold()
195   if thres == -1:
196     return False
197   else:
198     return sizeInByte > thres
199
200 def GetBigObjectDirectory():
201   import os
202   if SALOME_FILE_BIG_OBJ_DIR not in os.environ:
203     raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
204   return os.path.expanduser( os.path.expandvars( os.environ[SALOME_FILE_BIG_OBJ_DIR] ) )
205
206 def GetBigObjectFileName():
207   """
208   Return a filename in the most secure manner (see tempfile documentation)
209   """
210   import tempfile
211   with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
212     ret = f.name
213   return ret
214
215 class BigObjectOnDiskBase:
216   def __init__(self, fileName, objSerialized):
217     """
218     :param fileName: the file used to dump into.
219     :param objSerialized: the object in pickeled form
220     :type objSerialized: bytes
221     """
222     self._filename = fileName
223     # attribute _destroy is here to tell client side or server side
224     # only client side can be with _destroy set to True. server side due to risk of concurrency
225     # so pickled form of self must be done with this attribute set to False.
226     self._destroy = False
227     self.__dumpIntoFile(objSerialized)
228
229   def getDestroyStatus(self):
230     return self._destroy
231
232   def incrRef(self):
233     if self._destroy:
234       IncrRefInFile( self._filename )
235     else:
236       # should never happen !
237       RuntimeError("Invalid call to incrRef !")
238
239   def decrRef(self):
240     if self._destroy:
241       DecrRefInFile( self._filename )
242     else:
243       # should never happen !
244       RuntimeError("Invalid call to decrRef !")
245
246   def unlinkOnDestructor(self):
247     self._destroy = True
248
249   def doNotTouchFile(self):
250     """
251     Method called slave side. The life cycle management of file is client side not slave side.
252     """
253     self._destroy = False
254
255   def __del__(self):
256     if self._destroy:
257       DecrRefInFile( self._filename )
258
259   def getFileName(self):
260     return self._filename
261   
262   def __dumpIntoFile(self, objSerialized):
263     DumpInFile( objSerialized, self._filename )
264
265   def get(self, visitor = None):
266     obj = GetObjectFromFile( self._filename, visitor )
267     return obj
268
269   def __float__(self):
270     return float( self.get() )
271     
272   def __int__(self):
273     return int( self.get() )
274     
275   def __str__(self):
276     obj = self.get()
277     if isinstance(obj,str):
278         return obj
279     else:
280         raise RuntimeError("Not a string")
281       
282 class BigObjectOnDisk(BigObjectOnDiskBase):
283   def __init__(self, fileName, objSerialized):
284     BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
285     
286 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
287   def __init__(self, pos, length, fileName):
288     self._filename = fileName
289     self._destroy = False
290     self._pos = pos
291     self._length = length
292
293   def get(self, visitor = None):
294     fullObj = BigObjectOnDiskBase.get(self, visitor)
295     return fullObj[ self._pos ]
296     
297   def __getitem__(self, i):
298     return self.get()[i]
299
300   def __len__(self):
301     return len(self.get())
302     
303 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
304   def __init__(self, length, fileName, objSerialized):
305     BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
306     self._length = length
307
308   def __getitem__(self, i):
309     return BigObjectOnDiskListElement(i, self._length, self.getFileName())
310
311   def __len__(self):
312     return self._length
313
314 class BigObjectOnDiskList(BigObjectOnDiskSequence):
315   def __init__(self, length, fileName, objSerialized):
316     BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
317     
318 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
319   def __init__(self, length, fileName, objSerialized):
320     BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
321
322 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
323   """
324   This method return a proxy instance of pickled form of object given in input.
325
326   Args:
327   ----
328       obj (pickelable type) : object to be proxified
329       pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
330
331   Returns
332   -------
333       BigObjectOnDiskBase: proxy instance
334   """
335   pickleObj = pickleObjInit
336   if pickleObj is None:
337     pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
338   fileName = GetBigObjectFileName()
339   if visitor:
340     visitor.setHDDMem( len(pickleObj) )
341     visitor.setFileName(fileName)
342   if isinstance( obj, list):
343     proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
344   elif isinstance( obj, tuple):
345     proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
346   else:
347     proxyObj = BigObjectOnDisk( fileName , pickleObj )
348   return proxyObj
349
350 def SpoolPickleObject( obj, visitor = None ):
351   import pickle
352   with InOutputObjVisitorCM(visitor) as v:
353     pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
354     if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
355       return pickleObjInit
356     else:
357       proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
358       pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
359       return pickleProxy
360
361 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
362
363 def UnProxyObjectSimple( obj, visitor = None ):
364   """
365   Method to be called in Remote mode. Alterate the obj _status attribute. 
366   Because the slave process does not participate in the reference counting
367   
368   Args:
369   ----
370       visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
371
372   """
373   with InOutputObjVisitorCM(visitor) as v:
374     logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
375     if isinstance(obj,BigObjectOnDiskBase):
376       obj.doNotTouchFile()
377       return obj.get( v )
378     elif isinstance( obj, list):
379       retObj = []
380       for elt in obj:
381         retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
382       return retObj
383     else:
384       return obj
385
386 def UnProxyObjectSimpleLocal( obj ):
387   """
388   Method to be called in Local mode. Do not alterate the PyObj counter
389   """
390   if isinstance(obj,BigObjectOnDiskBase):
391     return obj.get()
392   elif isinstance( obj, list):
393     retObj = []
394     for elt in obj:
395       retObj.append( UnProxyObjectSimpleLocal(elt) )
396     return retObj
397   else:
398     return obj
399   
400 class FileHolder:
401   def __init__(self, fileName):
402     self._filename = fileName
403   @property
404   def filename(self):
405     return self._filename
406   
407 class FileDeleter(FileHolder):
408   def __init__(self, fileName):
409     super().__init__( fileName )
410   def __del__(self):
411     import os
412     if os.path.exists( self._filename ):
413       os.unlink( self._filename )
414
415 class MonitoringInfo:
416   def __init__(self, pyFileName, intervalInMs, outFileName, pid):
417     self._py_file_name = pyFileName
418     self._interval_in_ms = intervalInMs
419     self._out_file_name = outFileName
420     self._pid = pid
421
422   @property
423   def pyFileName(self):
424     return self._py_file_name
425
426   @property
427   def pid(self):
428     return self._pid
429   
430   @pid.setter
431   def pid(self, value):
432     self._pid = value
433
434   @property
435   def outFileName(self):
436     return self._out_file_name
437   
438   @property
439   def intervalInMs(self):
440     return self._interval_in_ms
441   
442 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
443     """
444     This method loops indefinitely every intervalInMs milliseconds to scan 
445     number of inodes and size of content recursively included into the in input directory.
446
447     Args:
448     ----
449
450     outFileName (str) : name of file inside the results will be written. If None a new file is generated
451
452     See also CPUMemoryMonitoring
453     """
454     global orb
455     import os
456     dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
457     import tempfile
458     import logging
459     import KernelBasis
460     # outFileNameSave stores the content of outFileName during phase of dumping
461     with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
462       outFileNameSave = f.name
463     with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
464       tempPyFile = f.name
465     tempOutFile = outFileName
466     if tempOutFile is None:
467       tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
468     with open(tempPyFile,"w") as f:
469         f.write("""
470 import subprocess as sp
471 import re
472 import os
473 import time
474 import datetime
475 with open("{tempOutFile}","a") as f:
476   f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
477   f.write( "{{}}\\n".format( "{intervalInMs}" ) )
478   while(True):
479     nbinodes = -1
480     try:
481       nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]),  ), shell = True).decode().strip()
482     except:
483       pass
484     szOfDirStr = "fail"
485     try:
486       st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
487       szOfDirStr = re.split("[\s]+",st)[0]
488     except:
489       pass
490     f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
491     f.write( "{{}}\\n".format( str( nbinodes  ) ) )
492     f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
493     f.flush()
494     time.sleep( {intervalInMs} / 1000.0 )
495 """.format( **locals()))
496     logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
497     pyFileName = FileDeleter( tempPyFile )
498     if outFileName is None:
499       outFileName = FileDeleter( tempOutFile )
500     else:
501       outFileName = FileHolder(outFileName)
502     return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
503
504 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
505   """
506   Launch a subprocess monitoring self process.
507   This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
508   CPU usage and RSS memory of the calling process.
509   Communication between subprocess and self is done by file.
510
511   Args:
512   ----
513     outFileName (str) : name of file inside the results will be written. If None a new file is generated
514
515   See also FileSystemMonitoring
516   """
517   import KernelBasis
518   def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
519     import os
520     import tempfile
521     with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
522       tempPyFile = f.name
523     tempOutFile = outFileName
524     if tempOutFile is None:
525       tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
526     pid = os.getpid()
527     with open(tempPyFile,"w") as f:
528       f.write("""import psutil
529 pid = {}
530 process = psutil.Process( pid )
531 import time
532 with open("{}","a") as f:
533   f.write( "{{}}\\n".format( "{}" ) )
534   while True:
535     f.write( "{{}}\\n".format( str( process.cpu_percent() ) ) )
536     f.write( "{{}}\\n".format( str( process.memory_info().rss  ) ) )
537     f.flush()
538     time.sleep( {} / 1000.0 )
539 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
540     if outFileName is None:
541       autoOutFile = FileDeleter(tempOutFile)
542     else:
543       autoOutFile = FileHolder(tempOutFile)
544     return FileDeleter(tempPyFile),autoOutFile
545   pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
546   return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
547
548 class GenericPythonMonitoringLauncherCtxMgr:
549     def __init__(self, monitoringParams):
550         """
551         Args:
552         ----
553             monitoringParams (MonitoringInfo)
554         """
555         self._monitoring_params = monitoringParams
556     def __enter__(self):
557         import KernelBasis
558         pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
559         self._monitoring_params.pid = pid
560         return self._monitoring_params
561     
562     def __exit__(self,exctype, exc, tb):
563         StopMonitoring( self._monitoring_params )
564
565 def StopMonitoring( monitoringInfo ):
566   """
567   Kill monitoring subprocess.
568
569   Args:
570   ----
571       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
572   """
573   import KernelBasis
574   KernelBasis.StopMonitoring(monitoringInfo.pid)
575
576 class CPUMemInfo:
577   def __init__(self, intervalInMs, cpu, mem_rss):
578     """
579     Args:
580     ----
581     intervalInMs (int)
582     cpu (list<float>)  CPU usage
583     mem_rss (list<int>) rss memory usage
584     """
585     self._interval_in_ms = intervalInMs
586     self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
587   def __str__(self):
588     st = """Interval in ms : {self.intervalInMs}
589 Data : ${self.data}
590 """.format( **locals() )
591     return st
592   @property
593   def intervalInMs(self):
594     return self._interval_in_ms
595   @property
596   def data(self):
597     """
598     list of triplets. First param of pair is cpu usage 
599                       Second param of pair is memory usage
600     """
601     return self._data
602
603 def ReadCPUMemInfoInternal( fileName ):
604   intervalInMs = 0
605   cpu = [] ; mem_rss = []
606   if os.path.exists( fileName ):
607     try:
608       with open(fileName, "r") as f:
609         coarseData = [ elt.strip() for elt in f.readlines() ]
610       intervalInMs = int( coarseData[0] )
611       coarseData = coarseData[1:]
612       cpu = [float(elt) for elt in coarseData[::2]]
613       mem_rss = [ int(elt) for elt in coarseData[1::2]]
614     except:
615       pass
616   return CPUMemInfo(intervalInMs,cpu,mem_rss)
617
618 def ReadCPUMemInfo( monitoringInfo ):
619   """
620   Retrieve CPU/Mem data of monitoring.
621
622   Args:
623   ----
624       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
625   
626   Returns
627   -------
628     CPUMemInfo instance
629   """
630   return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
631
632 class InodeSizeInfo:
633   def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
634     """
635     Args:
636     ----
637     timeStamps (list<datetimestruct>)
638     nbInodes (list<int>)
639     volumeOfDir (list<str>)
640     """
641     self._dir_name_monitored = dirNameMonitored
642     self._interval_in_ms = intervalInMs
643     self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
644   def __str__(self):
645     st = """Filename monitored : {self.dirNameMonitored}
646 Interval in ms : ${self.intervalInMs}
647 Data : ${self.data}
648 """.format( **locals() )
649     return st
650   @property
651   def dirNameMonitored(self):
652     return self._dir_name_monitored
653   @property
654   def intervalInMs(self):
655     return self._interval_in_ms
656   @property
657   def data(self):
658     """
659     list of triplets. First param of triplet is datetimestruct
660                                       Second param of triplet is #inodes.
661                                       Thirst param of triplet is size.
662     """
663     return self._data
664
665 def ReadInodeSizeInfoInternal( fileName ):
666   import datetime
667   import os
668   with open(fileName, "r") as f:
669     coarseData = [ elt.strip() for elt in f.readlines() ]
670   dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
671   tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
672   nbInodes = [int(elt) for elt in coarseData[1::3]]
673   volumeOfDir = coarseData[2::3]
674   return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
675
676 def ReadInodeSizeInfo( monitoringInfo ):
677   """
678   Retrieve nb of inodes and size of monitoring
679
680   Args:
681   ----
682       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
683
684   Returns
685   -------
686     InodeSizeInfo
687   """
688   return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
689
690 class SeqByteReceiver:
691   # 2GB limit to trigger split into chunks
692   CHUNK_SIZE = 2000000000
693   def __init__(self,sender):
694     self._obj = sender
695   def __del__(self):
696     self._obj.UnRegister()
697     pass
698   def data(self):
699     size = self._obj.getSize()
700     if size <= SeqByteReceiver.CHUNK_SIZE:
701       return self.fetchOneShot( size )
702     else:
703       return self.fetchByChunks( size )
704   def fetchOneShot(self,size):
705     return self._obj.sendPart(0,size)
706   def fetchByChunks(self,size):
707       """
708       To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
709       """
710       data_for_split_case = bytes(0)
711       EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
712       iStart = 0 ; iEnd = EFF_CHUNK_SIZE
713       while iStart!=iEnd and iEnd <= size:
714         part = self._obj.sendPart(iStart,iEnd)
715         data_for_split_case = bytes(0).join( [data_for_split_case,part] )
716         iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
717       return data_for_split_case
718   
719 FinalCode = """import pickle
720 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
721 import CORBA
722 import Engines
723 orb = CORBA.ORB_init([''])
724 codeFileName = "{}"
725 inputFileName = "{}"
726 outputFileName = "{}"
727 outputsKeys = {}
728 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
729 with open(inputFileName,"rb") as f:
730   context = pickle.load( f )
731 with open(codeFileName,"r") as f:
732   code = f.read()
733 # go for execution
734 exec( code , context )
735 # filter part of context to be exported to father process
736 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
737 #
738 with open(outputFileName,"wb") as f:
739   pickle.dump( context, f )
740 """
741
742 class PythonFunctionEvaluatorParams:
743   def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
744     self._main_filename = mainFileName
745     self._code_filename = codeFileName
746     self._in_context_filename = inContextFileName
747     self._out_context_filename = outContextFileName
748   @property
749   def result(self):
750     import pickle
751     with open(self._out_context_filename,"rb") as f:
752       return pickle.load( f )
753   def destroyOnOK(self):
754     for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
755       if os.path.exists( fileToDestroy ):
756         os.unlink( fileToDestroy )
757   def destroyOnKO(self, containerRef):
758      """
759      Called in the context of failure with replay mode activated
760      """
761      for fileToDestroy in [self._out_context_filename]:
762       if os.path.exists( fileToDestroy ):
763         os.unlink( fileToDestroy )
764       # register to container files group associated to the
765       containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
766   @property
767   def replayCmd(self):
768     return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
769   
770   @property
771   def cleanOperations(self):
772     import os
773     return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
774
775   def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
776     if returnCode == -1:
777       return f"return with non zero code ({returnCode})"
778     else:
779       banner = 200*"*"
780       if keepFilesToReplay:
781         return f"""return with non zero code ({returnCode})
782 {banner}
783 Looks like a hard crash as returnCode {returnCode} != 0
784 {self.replayCmd}
785 {self.cleanOperations}
786 {banner}
787 """
788       else:
789         return f"""return with non zero code ({returnCode})
790 {banner}
791 Looks like a hard crash as returnCode {returnCode} != 0
792 {banner}
793 """
794
795 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
796   """
797   Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
798   
799   Args:
800   -----
801
802   code (str) : python code to be executed using context
803   context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
804   outargsname (list<str>) : list of arguments to be exported 
805   containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
806   instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
807   keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
808
809   Return:
810   -------
811
812   ScriptExecInfo : instance serverside
813
814   In/Out:
815   -------
816
817   context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
818   """
819   import tempfile
820   import pickle
821   import subprocess as sp
822   import CORBA
823   #
824   def InternalExecResistant( code, context, outargsname):
825     orb = CORBA.ORB_init([''])
826     iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
827     ####
828     EXEC_CODE_FNAME_PXF = "execsafe_"
829     def RetrieveUniquePartFromPfx( fname ):
830       return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
831     with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
832       codeFd.write( code )
833       codeFd.flush()
834       codeFileName = os.path.basename( codeFd.name )
835       contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
836       with open(contextFileName,"wb") as contextFd:
837         pickle.dump( context, contextFd)
838       resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
839       mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
840       with open(mainExecFileName,"w") as f:
841         f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
842       p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
843       stdout, stderr = p.communicate()
844       returnCode = p.returncode
845     return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
846   ret = instanceOfLogOfCurrentSession._current_instance
847   returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
848   stdout = stdout.decode()
849   stderr = stderr.decode()
850   sys.stdout.write( stdout ) ; sys.stdout.flush()
851   sys.stderr.write( stderr ) ; sys.stderr.flush()
852   if returnCode == 0:
853     pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
854     if len(pcklData) > 0:
855       ret = pickle.loads( pcklData )
856     context.update( evParams.result )
857     evParams.destroyOnOK()
858     return ret
859   if returnCode != 0:
860     if keepFilesToReplay:
861       evParams.destroyOnKO( containerRef )
862     else:
863       evParams.destroyOnOK()
864     raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
865
866 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
867   return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
868
869 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
870   return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
871
872 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
873   exec( code, context )
874   return instanceOfLogOfCurrentSession._current_instance
875
876 class LogOfCurrentExecutionSession:
877   def __init__(self, handleToCentralizedInst):
878     self._remote_handle = handleToCentralizedInst
879     self._current_instance = ScriptExecInfo()
880
881   def addFreestyleAndFlush(self, value):
882     self._current_instance.freestyle = value
883     self.finalizeAndPushToMaster()
884
885   def addInfoOnLevel2(self, key, value):
886     setattr(self._current_instance,key,value)
887
888   def finalizeAndPushToMaster(self):
889     self._remote_handle.assign( pickle.dumps( self._current_instance ) )
890
891 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
892   """The implementation of the PyScriptNode CORBA IDL that executes a script"""
893   def __init__(self, nodeName, code, poa, my_container, logscript):
894     """Initialize the node : compilation in the local context"""
895     Generic.__init__(self,poa)
896     self.nodeName=nodeName
897     self.code=code
898     self.my_container_py = my_container
899     self.my_container=my_container._container
900     linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
901     self.ccode=compile(code,nodeName,'exec')
902     self.context={}
903     self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
904     self._log_script = logscript
905     self._current_execution_session = None
906     sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
907
908   @abc.abstractmethod
909   def executeNow(self, outargsname):
910     raise RuntimeError("Must be overloaded")
911       
912   def __del__(self):
913     # force removal of self.context. Don t know why it s not done by default
914     self.removeAllVarsInContext()
915     pass
916
917   def getContainer(self):
918     return self.my_container
919
920   def getCode(self):
921     return self.code
922
923   def getName(self):
924     return self.nodeName
925
926   def defineNewCustomVar(self,varName,valueOfVar):
927     self.context[varName] = pickle.loads(valueOfVar)
928     pass
929
930   def executeAnotherPieceOfCode(self,code):
931     """Called for initialization of container lodging self."""
932     try:
933       ccode=compile(code,self.nodeName,'exec')
934       exec(ccode, self.context)
935     except Exception:
936       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
937
938   def assignNewCompiledCode(self,codeStr):
939     try:
940       self.code=codeStr
941       self.ccode=compile(codeStr,self.nodeName,'exec')
942     except Exception:
943       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
944
945   def executeSimple(self, key, val):
946     """
947     Same as execute method except that no pickelization mecanism is implied here. No output is expected
948     """
949     try:
950       self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
951       exec(self.ccode,self.context)
952     except Exception:
953       exc_typ,exc_val,exc_fr=sys.exc_info()
954       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
955       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
956       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
957     
958   def execute(self,outargsname,argsin):
959     """Execute the script stored in attribute ccode with pickled args (argsin)"""
960     try:
961       argsname,kws=pickle.loads(argsin)
962       self.context.update(kws)
963       exec(self.ccode, self.context)
964       argsout=[]
965       for arg in outargsname:
966         if arg not in self.context:
967           raise KeyError("There is no variable %s in context" % arg)
968         argsout.append(self.context[arg])
969       argsout=pickle.dumps(tuple(argsout),-1)
970       return argsout
971     except Exception:
972       exc_typ,exc_val,exc_fr=sys.exc_info()
973       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
974       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
975       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
976
977   def executeFirst(self,argsin):
978     """ Same than first part of self.execute to reduce memory peak."""
979     def ArgInMananger(self,argsin):
980       argsInPy = SeqByteReceiver( argsin )
981       data = argsInPy.data()
982       self.addInfoOnLevel2("inputMem",len(data))
983       _,kws=pickle.loads(data)
984       return kws
985     try:
986       self.beginOfCurrentExecutionSession()
987       self.addTimeInfoOnLevel2("startInputTime")
988       # to force call of SeqByteReceiver's destructor
989       kws = ArgInMananger(self,argsin)
990       vis = InOutputObjVisitor()
991       for elt in kws:
992         # fetch real data if necessary
993         kws[elt] = UnProxyObjectSimple( kws[elt],vis)
994       self.addInfoOnLevel2("inputHDDMem",vis)
995       self.context.update(kws)
996       self.addTimeInfoOnLevel2("endInputTime")
997     except Exception:
998       exc_typ,exc_val,exc_fr=sys.exc_info()
999       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1000       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1001       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1002
1003   def executeSecond(self,outargsname):
1004     """ Same than second part of self.execute to reduce memory peak."""
1005     import sys
1006     try:
1007       self.addTimeInfoOnLevel2("startExecTime")
1008       ##
1009       self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1010       with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
1011         self._current_execution_session._current_instance = self.executeNow( outargsname )
1012         cpumeminfo = ReadCPUMemInfo( monitoringParams )
1013       ##
1014       self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1015       del monitoringParams
1016       self.addTimeInfoOnLevel2("endExecTime")
1017       self.addTimeInfoOnLevel2("startOutputTime")
1018       argsout=[]
1019       for arg in outargsname:
1020         if arg not in self.context:
1021           raise KeyError("There is no variable %s in context" % arg)
1022         argsout.append(self.context[arg])
1023       ret = [ ]
1024       outputMem = 0
1025       vis = InOutputObjVisitor()
1026       for arg in argsout:
1027         # the proxy mecanism is catched here
1028         argPickle = SpoolPickleObject( arg, vis )
1029         retArg = SenderByte_i( self.poa,argPickle )
1030         id_o = self.poa.activate_object(retArg)
1031         retObj = self.poa.id_to_reference(id_o)
1032         ret.append( retObj._narrow( SALOME.SenderByte ) )
1033         outputMem += len(argPickle)
1034       self.addInfoOnLevel2("outputMem",outputMem)
1035       self.addInfoOnLevel2("outputHDDMem",vis)
1036       self.addTimeInfoOnLevel2("endOutputTime")
1037       self.endOfCurrentExecutionSession()
1038       return ret
1039     except Exception:
1040       exc_typ,exc_val,exc_fr=sys.exc_info()
1041       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1042       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1043       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1044
1045   def listAllVarsInContext(self):
1046       import re
1047       pat = re.compile("^__([a-z]+)__$")
1048       return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1049       
1050   def removeAllVarsInContext(self):
1051       for elt in self.listAllVarsInContext():
1052         del self.context[elt]
1053
1054   def getValueOfVarInContext(self,varName):
1055     try:
1056       return pickle.dumps(self.context[varName],-1)
1057     except Exception:
1058       exc_typ,exc_val,exc_fr=sys.exc_info()
1059       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1060       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1061     pass
1062   
1063   def assignVarInContext(self, varName, value):
1064     try:
1065       self.context[varName][0] = pickle.loads(value)
1066     except Exception:
1067       exc_typ,exc_val,exc_fr=sys.exc_info()
1068       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1069       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1070     pass
1071
1072   def callMethodOnVarInContext(self, varName, methodName, args):
1073     try:
1074       return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1075     except Exception:
1076       exc_typ,exc_val,exc_fr=sys.exc_info()
1077       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1078       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1079     pass
1080
1081   def beginOfCurrentExecutionSession(self):
1082     self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1083     self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1084   
1085   def endOfCurrentExecutionSession(self):
1086     self._current_execution_session.finalizeAndPushToMaster()
1087     self._current_execution_session = None
1088
1089   def addInfoOnLevel2(self, key, value):
1090     self._current_execution_session.addInfoOnLevel2(key, value)
1091       
1092   def addTimeInfoOnLevel2(self, key):
1093     from datetime import datetime
1094     self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1095
1096 class PyScriptNode_i(PyScriptNode_Abstract_i):
1097   def __init__(self, nodeName, code, poa, my_container, logscript):
1098     super().__init__(nodeName, code, poa, my_container, logscript)
1099
1100   def executeNow(self, outargsname):
1101     return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1102     
1103 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1104   def __init__(self, nodeName, code, poa, my_container, logscript):
1105     super().__init__(nodeName, code, poa, my_container, logscript)
1106
1107   def executeNow(self, outargsname):
1108     return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1109
1110 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1111   def __init__(self, nodeName, code, poa, my_container, logscript):
1112     super().__init__(nodeName, code, poa, my_container, logscript)
1113
1114   def executeNow(self, outargsname):
1115     return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)