]> SALOME platform Git repositories - modules/kernel.git/blob - src/Container/SALOME_PyNode.py
Salome HOME
add options-help get from launcher
[modules/kernel.git] / src / Container / SALOME_PyNode.py
1 #  -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024  CEA, EDF, OPEN CASCADE
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 #  File   : SALOME_PyNode.py
22 #  Author : Christian CAREMOLI, EDF
23 #  Module : SALOME
24 #  $Header$
25 #
26 import sys,traceback
27 import linecache
28 import pickle
29 import Engines__POA
30 import SALOME__POA
31 import SALOME
32 import logging
33 import KernelBasis
34 import abc
35 import os
36 import sys
37 from SALOME_ContainerHelper import ScriptExecInfo
38
39 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
40
41 MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
42
43 class Generic(SALOME__POA.GenericObj):
44   """A Python implementation of the GenericObj CORBA IDL"""
45   def __init__(self,poa):
46     self.poa=poa
47     self.cnt=1
48
49   def Register(self):
50     #print("Register called : %d"%self.cnt)
51     self.cnt+=1
52
53   def UnRegister(self):
54     #print("UnRegister called : %d"%self.cnt)
55     self.cnt-=1
56     if self.cnt <= 0:
57       oid=self.poa.servant_to_id(self)
58       self.poa.deactivate_object(oid)
59
60   def Destroy(self):
61     print("WARNING SALOME::GenericObj::Destroy() function is obsolete! Use UnRegister() instead.")
62     self.UnRegister()
63
64   def __del__(self):
65     #print("Destuctor called")
66     pass
67
68 class PyNode_i (Engines__POA.PyNode,Generic):
69   """The implementation of the PyNode CORBA IDL"""
70   def __init__(self, nodeName,code,poa,my_container):
71     """Initialize the node : compilation in the local context"""
72     Generic.__init__(self,poa)
73     self.nodeName=nodeName
74     self.code=code
75     self.my_container=my_container._container
76     linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
77     ccode=compile(code,nodeName,'exec')
78     self.context={}
79     self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
80     exec(ccode, self.context)
81
82   def getContainer(self):
83     return self.my_container
84
85   def getCode(self):
86     return self.code
87
88   def getName(self):
89     return self.nodeName
90
91   def defineNewCustomVar(self,varName,valueOfVar):
92     self.context[varName] = pickle.loads(valueOfVar)
93     pass
94
95   def executeAnotherPieceOfCode(self,code):
96     """Called for initialization of container lodging self."""
97     try:
98       ccode=compile(code,self.nodeName,'exec')
99       exec(ccode, self.context)
100     except Exception:
101       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
102
103   def execute(self,funcName,argsin):
104     """Execute the function funcName found in local context with pickled args (argsin)"""
105     try:
106       argsin,kws=pickle.loads(argsin)
107       func=self.context[funcName]
108       argsout=func(*argsin,**kws)
109       argsout=pickle.dumps(argsout,-1)
110       return argsout
111     except Exception:
112       exc_typ,exc_val,exc_fr=sys.exc_info()
113       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
114       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
115
116 class SenderByte_i(SALOME__POA.SenderByte,Generic):
117   def __init__(self,poa,bytesToSend):
118     Generic.__init__(self,poa)
119     self.bytesToSend = bytesToSend
120
121   def getSize(self):
122     return len(self.bytesToSend)
123
124   def sendPart(self,n1,n2):
125     return self.bytesToSend[n1:n2]
126
127 DicoForProxyFile = { }
128
129 def GetSizeOfBufferedReader(f):
130   """
131   This method returns in bytes size of a file openned.
132
133   Args:
134   ----
135       f (io.IOBase): buffered reader returned by open
136       
137   Returns
138   -------
139       int: number of bytes
140   """
141   import io
142   pos = f.tell()
143   f.seek(0,io.SEEK_END)
144   pos2 = f.tell()
145   f.seek(pos,io.SEEK_SET)
146   return pos2-pos
147
148 def GetObjectFromFile(fname, visitor = None):
149   with open(fname,"rb") as f:
150     if visitor:
151       visitor.setHDDMem( GetSizeOfBufferedReader(f) )
152       visitor.setFileName( fname )
153     obj = pickle.load(f)
154   return obj
155
156 def DumpInFile(obj,fname):
157   with open(fname,"wb") as f:
158     f.write( obj )
159
160 def IncrRefInFile(fname):
161   if fname in DicoForProxyFile:
162     DicoForProxyFile[fname] += 1
163   else:
164     DicoForProxyFile[fname] = 2
165   pass
166
167 def DecrRefInFile(fname):
168   if fname not in DicoForProxyFile:
169     cnt = 1
170   else:
171     cnt = DicoForProxyFile[fname]
172     DicoForProxyFile[fname] -= 1
173     if cnt == 1:
174       del DicoForProxyFile[fname]
175   if cnt == 1:
176     if os.path.exists(fname):
177       os.unlink( fname )
178   pass
179
180 def GetBigObjectOnDiskThreshold():
181     return KernelBasis.GetBigObjOnDiskThreshold()
182
183 def ActivateProxyMecanismOrNot( sizeInByte ):
184   thres = GetBigObjectOnDiskThreshold()
185   if thres == -1:
186     return False
187   else:
188     return sizeInByte > thres
189
190 def GetBigObjectDirectory():
191   import os
192   if not KernelBasis.BigObjOnDiskDirectoryDefined():
193     raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
194   return os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
195
196 def GetBigObjectFileName():
197   """
198   Return a filename in the most secure manner (see tempfile documentation)
199   """
200   import tempfile
201   with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
202     ret = f.name
203   return ret
204
205 class BigObjectOnDiskBase:
206   def __init__(self, fileName, objSerialized):
207     """
208     :param fileName: the file used to dump into.
209     :param objSerialized: the object in pickeled form
210     :type objSerialized: bytes
211     """
212     self._filename = fileName
213     # attribute _destroy is here to tell client side or server side
214     # only client side can be with _destroy set to True. server side due to risk of concurrency
215     # so pickled form of self must be done with this attribute set to False.
216     self._destroy = False
217     self.__dumpIntoFile(objSerialized)
218
219   def getDestroyStatus(self):
220     return self._destroy
221
222   def incrRef(self):
223     if self._destroy:
224       IncrRefInFile( self._filename )
225     else:
226       # should never happen !
227       RuntimeError("Invalid call to incrRef !")
228
229   def decrRef(self):
230     if self._destroy:
231       DecrRefInFile( self._filename )
232     else:
233       # should never happen !
234       RuntimeError("Invalid call to decrRef !")
235
236   def unlinkOnDestructor(self):
237     self._destroy = True
238
239   def doNotTouchFile(self):
240     """
241     Method called slave side. The life cycle management of file is client side not slave side.
242     """
243     self._destroy = False
244
245   def __del__(self):
246     if self._destroy:
247       DecrRefInFile( self._filename )
248
249   def getFileName(self):
250     return self._filename
251   
252   def __dumpIntoFile(self, objSerialized):
253     DumpInFile( objSerialized, self._filename )
254
255   def get(self, visitor = None):
256     obj = GetObjectFromFile( self._filename, visitor )
257     return obj
258
259   def __float__(self):
260     return float( self.get() )
261     
262   def __int__(self):
263     return int( self.get() )
264     
265   def __str__(self):
266     obj = self.get()
267     if isinstance(obj,str):
268         return obj
269     else:
270         raise RuntimeError("Not a string")
271       
272 class BigObjectOnDisk(BigObjectOnDiskBase):
273   def __init__(self, fileName, objSerialized):
274     BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
275     
276 class BigObjectOnDiskListElement(BigObjectOnDiskBase):
277   def __init__(self, pos, length, fileName):
278     self._filename = fileName
279     self._destroy = False
280     self._pos = pos
281     self._length = length
282
283   def get(self, visitor = None):
284     fullObj = BigObjectOnDiskBase.get(self, visitor)
285     return fullObj[ self._pos ]
286     
287   def __getitem__(self, i):
288     return self.get()[i]
289
290   def __len__(self):
291     return len(self.get())
292     
293 class BigObjectOnDiskSequence(BigObjectOnDiskBase):
294   def __init__(self, length, fileName, objSerialized):
295     BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
296     self._length = length
297
298   def __getitem__(self, i):
299     return BigObjectOnDiskListElement(i, self._length, self.getFileName())
300
301   def __len__(self):
302     return self._length
303
304 class BigObjectOnDiskList(BigObjectOnDiskSequence):
305   def __init__(self, length, fileName, objSerialized):
306     BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
307     
308 class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
309   def __init__(self, length, fileName, objSerialized):
310     BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
311
312 def ProxyfyPickeled( obj, pickleObjInit = None, visitor = None ):
313   """
314   This method return a proxy instance of pickled form of object given in input.
315
316   Args:
317   ----
318       obj (pickelable type) : object to be proxified
319       pickleObjInit (bytes) : Optionnal. Original pickeled form of object to be proxyfied if already computed. If not this method generate it
320
321   Returns
322   -------
323       BigObjectOnDiskBase: proxy instance
324   """
325   pickleObj = pickleObjInit
326   if pickleObj is None:
327     pickleObj = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
328   fileName = GetBigObjectFileName()
329   if visitor:
330     visitor.setHDDMem( len(pickleObj) )
331     visitor.setFileName(fileName)
332   if isinstance( obj, list):
333     proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
334   elif isinstance( obj, tuple):
335     proxyObj = BigObjectOnDiskTuple( len(obj), fileName , pickleObj )
336   else:
337     proxyObj = BigObjectOnDisk( fileName , pickleObj )
338   return proxyObj
339
340 def SpoolPickleObject( obj, visitor = None ):
341   import pickle
342   with InOutputObjVisitorCM(visitor) as v:
343     pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
344     if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
345       return pickleObjInit
346     else:
347       proxyObj = ProxyfyPickeled( obj, pickleObjInit, v.visitor() )
348       pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
349       return pickleProxy
350
351 from SALOME_ContainerHelper import InOutputObjVisitorCM, InOutputObjVisitor
352
353 def UnProxyObjectSimple( obj, visitor = None ):
354   """
355   Method to be called in Remote mode. Alterate the obj _status attribute. 
356   Because the slave process does not participate in the reference counting
357   
358   Args:
359   ----
360       visitor (InOutputObjVisitor): A visitor to keep track of amount of memory on chip and those on HDD
361
362   """
363   with InOutputObjVisitorCM(visitor) as v:
364     logging.debug( "UnProxyObjectSimple {}".format(type(obj)) )
365     if isinstance(obj,BigObjectOnDiskBase):
366       obj.doNotTouchFile()
367       return obj.get( v )
368     elif isinstance( obj, list):
369       retObj = []
370       for elt in obj:
371         retObj.append( UnProxyObjectSimple(elt,v.visitor()) )
372       return retObj
373     else:
374       return obj
375
376 def UnProxyObjectSimpleLocal( obj ):
377   """
378   Method to be called in Local mode. Do not alterate the PyObj counter
379   """
380   if isinstance(obj,BigObjectOnDiskBase):
381     return obj.get()
382   elif isinstance( obj, list):
383     retObj = []
384     for elt in obj:
385       retObj.append( UnProxyObjectSimpleLocal(elt) )
386     return retObj
387   else:
388     return obj
389   
390 class FileHolder:
391   def __init__(self, fileName):
392     self._filename = fileName
393   @property
394   def filename(self):
395     return self._filename
396   
397 class FileDeleter(FileHolder):
398   def __init__(self, fileName):
399     super().__init__( fileName )
400   def __del__(self):
401     import os
402     if os.path.exists( self._filename ):
403       os.unlink( self._filename )
404
405 class MonitoringInfo:
406   def __init__(self, pyFileName, intervalInMs, outFileName, pid):
407     self._py_file_name = pyFileName
408     self._interval_in_ms = intervalInMs
409     self._out_file_name = outFileName
410     self._pid = pid
411
412   @property
413   def pyFileName(self):
414     return self._py_file_name
415
416   @property
417   def pid(self):
418     return self._pid
419   
420   @pid.setter
421   def pid(self, value):
422     self._pid = value
423
424   @property
425   def outFileName(self):
426     return self._out_file_name
427   
428   @property
429   def intervalInMs(self):
430     return self._interval_in_ms
431   
432 def FileSystemMonitoring(intervalInMs, dirNameToInspect, outFileName = None):
433     """
434     This method loops indefinitely every intervalInMs milliseconds to scan 
435     number of inodes and size of content recursively included into the in input directory.
436
437     Args:
438     ----
439
440     outFileName (str) : name of file inside the results will be written. If None a new file is generated
441
442     See also CPUMemoryMonitoring
443     """
444     global orb
445     import os
446     dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
447     import tempfile
448     import logging
449     import KernelBasis
450     # outFileNameSave stores the content of outFileName during phase of dumping
451     with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".txt") as f:
452       outFileNameSave = f.name
453     with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
454       tempPyFile = f.name
455     tempOutFile = outFileName
456     if tempOutFile is None:
457       tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
458     with open(tempPyFile,"w") as f:
459         f.write("""
460 import subprocess as sp
461 import re
462 import os
463 import time
464 import datetime
465 with open("{tempOutFile}","a") as f:
466   f.write( "{{}}\\n".format( "{dirNameToInspect2}" ) )
467   f.write( "{{}}\\n".format( "{intervalInMs}" ) )
468   while(True):
469     nbinodes = -1
470     try:
471       nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find","{dirNameToInspect2}"]),  ), shell = True).decode().strip()
472     except:
473       pass
474     szOfDirStr = "fail"
475     try:
476       st = sp.check_output(["du","-sh","{dirNameToInspect2}"]).decode()
477       szOfDirStr = re.split("[\s]+",st)[0]
478     except:
479       pass
480     f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
481     f.write( "{{}}\\n".format( str( nbinodes  ) ) )
482     f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
483     f.flush()
484     time.sleep( {intervalInMs} / 1000.0 )
485 """.format( **locals()))
486     logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
487     pyFileName = FileDeleter( tempPyFile )
488     if outFileName is None:
489       outFileName = FileDeleter( tempOutFile )
490     else:
491       outFileName = FileHolder(outFileName)
492     return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
493
494 def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
495   """
496   Launch a subprocess monitoring self process.
497   This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
498   CPU usage and RSS memory of the calling process.
499   Communication between subprocess and self is done by file.
500
501   Args:
502   ----
503     outFileName (str) : name of file inside the results will be written. If None a new file is generated
504
505   See also FileSystemMonitoring
506   """
507   import KernelBasis
508   def BuildPythonFileForCPUPercent( intervalInMs, outFileName):
509     import os
510     import tempfile
511     with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
512       tempPyFile = f.name
513     tempOutFile = outFileName
514     if tempOutFile is None:
515       tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
516     pid = os.getpid()
517     with open(tempPyFile,"w") as f:
518       f.write("""import psutil
519 pid = {}
520 process = psutil.Process( pid )
521 def getChargeOf( p ):
522   a,b = p.cpu_percent(), p.memory_info().rss
523   try:
524     for c in p.children():
525       a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
526   except:
527     pass
528   return a,b
529 import time
530 with open("{}","a") as f:
531   f.write( "{{}}\\n".format( "{}" ) )
532   while True:
533     cpu,mem_rss = getChargeOf( process )
534     f.write( "{{}}\\n".format( str( cpu ) ) )
535     f.write( "{{}}\\n".format( str( mem_rss  ) ) )
536     f.flush()
537     time.sleep( {} / 1000.0 )
538 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
539     if outFileName is None:
540       autoOutFile = FileDeleter(tempOutFile)
541     else:
542       autoOutFile = FileHolder(tempOutFile)
543     return FileDeleter(tempPyFile),autoOutFile
544   pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs, outFileName )
545   return MonitoringInfo(pyFileName, intervalInMs, outFileName, None)
546
547 class GenericPythonMonitoringLauncherCtxMgr:
548     def __init__(self, monitoringParams):
549         """
550         Args:
551         ----
552             monitoringParams (MonitoringInfo)
553         """
554         self._monitoring_params = monitoringParams
555     def __enter__(self):
556         import KernelBasis
557         pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
558         self._monitoring_params.pid = pid
559         return self._monitoring_params
560     
561     def __exit__(self,exctype, exc, tb):
562         StopMonitoring( self._monitoring_params )
563
564 def StopMonitoring( monitoringInfo ):
565   """
566   Kill monitoring subprocess.
567
568   Args:
569   ----
570       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
571   """
572   import KernelBasis
573   KernelBasis.StopMonitoring(monitoringInfo.pid)
574
575 class CPUMemInfo:
576   def __init__(self, intervalInMs, cpu, mem_rss):
577     """
578     Args:
579     ----
580     intervalInMs (int)
581     cpu (list<float>)  CPU usage
582     mem_rss (list<int>) rss memory usage
583     """
584     self._interval_in_ms = intervalInMs
585     self._data = [(a,b) for a,b in zip(cpu,mem_rss)]
586   def __str__(self):
587     st = """Interval in ms : {self.intervalInMs}
588 Data : ${self.data}
589 """.format( **locals() )
590     return st
591   @property
592   def intervalInMs(self):
593     return self._interval_in_ms
594   @property
595   def data(self):
596     """
597     list of triplets. First param of pair is cpu usage 
598                       Second param of pair is memory usage
599     """
600     return self._data
601
602 def ReadCPUMemInfoInternal( fileName ):
603   intervalInMs = 0
604   cpu = [] ; mem_rss = []
605   if os.path.exists( fileName ):
606     try:
607       with open(fileName, "r") as f:
608         coarseData = [ elt.strip() for elt in f.readlines() ]
609       intervalInMs = int( coarseData[0] )
610       coarseData = coarseData[1:]
611       cpu = [float(elt) for elt in coarseData[::2]]
612       mem_rss = [ int(elt) for elt in coarseData[1::2]]
613     except:
614       pass
615   return CPUMemInfo(intervalInMs,cpu,mem_rss)
616
617 def ReadCPUMemInfo( monitoringInfo ):
618   """
619   Retrieve CPU/Mem data of monitoring.
620
621   Args:
622   ----
623       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
624   
625   Returns
626   -------
627     CPUMemInfo instance
628   """
629   return ReadCPUMemInfoInternal( monitoringInfo.outFileName.filename )
630
631 class InodeSizeInfo:
632   def __init__(self, dirNameMonitored, intervalInMs, timeStamps, nbInodes, volumeOfDir):
633     """
634     Args:
635     ----
636     timeStamps (list<datetimestruct>)
637     nbInodes (list<int>)
638     volumeOfDir (list<str>)
639     """
640     self._dir_name_monitored = dirNameMonitored
641     self._interval_in_ms = intervalInMs
642     self._data = [(t,a,b) for t,a,b in zip(timeStamps,nbInodes,volumeOfDir)]
643   def __str__(self):
644     st = """Filename monitored : {self.dirNameMonitored}
645 Interval in ms : ${self.intervalInMs}
646 Data : ${self.data}
647 """.format( **locals() )
648     return st
649   @property
650   def dirNameMonitored(self):
651     return self._dir_name_monitored
652   @property
653   def intervalInMs(self):
654     return self._interval_in_ms
655   @property
656   def data(self):
657     """
658     list of triplets. First param of triplet is datetimestruct
659                                       Second param of triplet is #inodes.
660                                       Thirst param of triplet is size.
661     """
662     return self._data
663
664 def ReadInodeSizeInfoInternal( fileName ):
665   import datetime
666   import os
667   with open(fileName, "r") as f:
668     coarseData = [ elt.strip() for elt in f.readlines() ]
669   dirNameMonitored = coarseData[0] ; intervalInMs = int( coarseData[1] ) ; coarseData = coarseData[2:]
670   tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
671   nbInodes = [int(elt) for elt in coarseData[1::3]]
672   volumeOfDir = coarseData[2::3]
673   return InodeSizeInfo(dirNameMonitored,intervalInMs,tss,nbInodes,volumeOfDir)
674
675 def ReadInodeSizeInfo( monitoringInfo ):
676   """
677   Retrieve nb of inodes and size of monitoring
678
679   Args:
680   ----
681       monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
682
683   Returns
684   -------
685     InodeSizeInfo
686   """
687   return ReadInodeSizeInfoInternal( monitoringInfo.outFileName.filename )
688
689 class SeqByteReceiver:
690   # 2GB limit to trigger split into chunks
691   CHUNK_SIZE = 2000000000
692   def __init__(self,sender):
693     self._obj = sender
694   def __del__(self):
695     self._obj.UnRegister()
696     pass
697   def data(self):
698     size = self._obj.getSize()
699     if size <= SeqByteReceiver.CHUNK_SIZE:
700       return self.fetchOneShot( size )
701     else:
702       return self.fetchByChunks( size )
703   def fetchOneShot(self,size):
704     return self._obj.sendPart(0,size)
705   def fetchByChunks(self,size):
706       """
707       To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
708       """
709       data_for_split_case = bytes(0)
710       EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
711       iStart = 0 ; iEnd = EFF_CHUNK_SIZE
712       while iStart!=iEnd and iEnd <= size:
713         part = self._obj.sendPart(iStart,iEnd)
714         data_for_split_case = bytes(0).join( [data_for_split_case,part] )
715         iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
716       return data_for_split_case
717   
718 FinalCode = """import pickle
719 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
720 import CORBA
721 import Engines
722 orb = CORBA.ORB_init([''])
723 codeFileName = "{}"
724 inputFileName = "{}"
725 outputFileName = "{}"
726 outputsKeys = {}
727 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
728 with open(inputFileName,"rb") as f:
729   context = pickle.load( f )
730 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
731 with open(codeFileName,"r") as f:
732   code = f.read()
733 # go for execution
734 exec( code , context )
735 # filter part of context to be exported to father process
736 context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
737 #
738 with open(outputFileName,"wb") as f:
739   pickle.dump( context, f )
740 """
741
742 class PythonFunctionEvaluatorParams:
743   def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
744     self._main_filename = mainFileName
745     self._code_filename = codeFileName
746     self._in_context_filename = inContextFileName
747     self._out_context_filename = outContextFileName
748   @property
749   def result(self):
750     import pickle
751     with open(self._out_context_filename,"rb") as f:
752       return pickle.load( f )
753   def destroyOnOK(self):
754     for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
755       if os.path.exists( fileToDestroy ):
756         os.unlink( fileToDestroy )
757   def destroyOnKO(self, containerRef):
758      """
759      Called in the context of failure with replay mode activated
760      """
761      for fileToDestroy in [self._out_context_filename]:
762       if os.path.exists( fileToDestroy ):
763         os.unlink( fileToDestroy )
764       # register to container files group associated to the
765       containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
766   @property
767   def replayCmd(self):
768     return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
769   
770   @property
771   def cleanOperations(self):
772     import os
773     return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
774
775   def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
776     if returnCode == -1:
777       return f"return with non zero code ({returnCode})"
778     else:
779       banner = 200*"*"
780       if keepFilesToReplay:
781         return f"""return with non zero code ({returnCode})
782 {banner}
783 Looks like a hard crash as returnCode {returnCode} != 0
784 {self.replayCmd}
785 {self.cleanOperations}
786 {banner}
787 """
788       else:
789         return f"""return with non zero code ({returnCode})
790 {banner}
791 Looks like a hard crash as returnCode {returnCode} != 0
792 {banner}
793 """
794
795 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
796   """
797   Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
798   
799   Args:
800   -----
801
802   code (str) : python code to be executed using context
803   context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
804   outargsname (list<str>) : list of arguments to be exported 
805   containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
806   instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
807   keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
808
809   Return:
810   -------
811
812   ScriptExecInfo : instance serverside
813
814   In/Out:
815   -------
816
817   context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
818   """
819   import tempfile
820   import pickle
821   import subprocess as sp
822   import CORBA
823   #
824   def InternalExecResistant( code, context, outargsname):
825     orb = CORBA.ORB_init([''])
826     iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
827     ####
828     EXEC_CODE_FNAME_PXF = "execsafe_"
829     def RetrieveUniquePartFromPfx( fname ):
830       return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
831     with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
832       codeFd.write( code )
833       codeFd.flush()
834       codeFileName = os.path.basename( codeFd.name )
835       contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
836       with open(contextFileName,"wb") as contextFd:
837         pickle.dump( context, contextFd)
838       resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
839       mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
840       with open(mainExecFileName,"w") as f:
841         f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
842       p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
843       stdout, stderr = p.communicate()
844       returnCode = p.returncode
845     return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
846   ret = instanceOfLogOfCurrentSession._current_instance
847   returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
848   stdout = stdout.decode()
849   stderr = stderr.decode()
850   sys.stdout.write( stdout ) ; sys.stdout.flush()
851   sys.stderr.write( stderr ) ; sys.stderr.flush()
852   if returnCode == 0:
853     pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
854     if len(pcklData) > 0:
855       ret = pickle.loads( pcklData )
856     context.update( evParams.result )
857     evParams.destroyOnOK()
858     return ret
859   if returnCode != 0:
860     if keepFilesToReplay:
861       evParams.destroyOnKO( containerRef )
862     else:
863       evParams.destroyOnOK()
864     raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
865
866 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
867   return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
868
869 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
870   return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
871
872 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
873   exec( code, context )
874   return instanceOfLogOfCurrentSession._current_instance
875
876 class LogOfCurrentExecutionSessionAbs(abc.ABC):
877   def __init__(self):
878     self._current_instance = ScriptExecInfo()
879
880   def addInfoOnLevel2(self, key, value):
881     setattr(self._current_instance,key,value)
882
883   @abc.abstractmethod
884   def addFreestyleAndFlush(self, value):
885     raise RuntimeError("Must be overloaded")
886
887 class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
888   def __init__(self, handleToCentralizedInst):
889     super().__init__()
890     self._remote_handle = handleToCentralizedInst
891
892   def addFreestyleAndFlush(self, value):
893     self._current_instance.freestyle = value
894     self.finalizeAndPushToMaster()
895
896   def finalizeAndPushToMaster(self):
897     self._remote_handle.assign( pickle.dumps( self._current_instance ) )
898
899 class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
900   """
901   This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
902   """
903   def __init__(self, handleToCentralizedInst = None):
904     super().__init__()
905   def addFreestyleAndFlush(self, value):
906     pass
907
908 class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
909   """The implementation of the PyScriptNode CORBA IDL that executes a script"""
910   def __init__(self, nodeName, code, poa, my_container, logscript):
911     """Initialize the node : compilation in the local context"""
912     Generic.__init__(self,poa)
913     self.nodeName=nodeName
914     self.code=code
915     self.my_container_py = my_container
916     self.my_container=my_container._container
917     linecache.cache[nodeName]=0,None,code.split('\n'),nodeName
918     self.ccode=compile(code,nodeName,'exec')
919     self.context={}
920     self.context[MY_CONTAINER_ENTRY_IN_GLBS] = self.my_container
921     self._log_script = logscript
922     self._current_execution_session = None
923     sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
924
925   @abc.abstractmethod
926   def executeNow(self, outargsname):
927     raise RuntimeError("Must be overloaded")
928       
929   def __del__(self):
930     # force removal of self.context. Don t know why it s not done by default
931     self.removeAllVarsInContext()
932     pass
933
934   def getContainer(self):
935     return self.my_container
936
937   def getCode(self):
938     return self.code
939
940   def getName(self):
941     return self.nodeName
942
943   def defineNewCustomVar(self,varName,valueOfVar):
944     self.context[varName] = pickle.loads(valueOfVar)
945     pass
946
947   def executeAnotherPieceOfCode(self,code):
948     """Called for initialization of container lodging self."""
949     try:
950       ccode=compile(code,self.nodeName,'exec')
951       exec(ccode, self.context)
952     except Exception:
953       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode (%s) : code to be executed \"%s\"" %(self.nodeName,code),0))
954
955   def assignNewCompiledCode(self,codeStr):
956     try:
957       self.code=codeStr
958       self.ccode=compile(codeStr,self.nodeName,'exec')
959     except Exception:
960       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"","PyScriptNode.assignNewCompiledCode (%s) : code to be executed \"%s\"" %(self.nodeName,codeStr),0))
961
962   def executeSimple(self, key, val):
963     """
964     Same as execute method except that no pickelization mecanism is implied here. No output is expected
965     """
966     try:
967       self.context.update({ "env" : [(k,v) for k,v in zip(key,val)]})
968       exec(self.ccode,self.context)
969     except Exception:
970       exc_typ,exc_val,exc_fr=sys.exc_info()
971       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
972       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
973       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" % (self.nodeName),0))
974     
975   def execute(self,outargsname,argsin):
976     """Execute the script stored in attribute ccode with pickled args (argsin)"""
977     try:
978       argsname,kws=pickle.loads(argsin)
979       self.context.update(kws)
980       exec(self.ccode, self.context)
981       argsout=[]
982       for arg in outargsname:
983         if arg not in self.context:
984           raise KeyError("There is no variable %s in context" % arg)
985         argsout.append(self.context[arg])
986       argsout=pickle.dumps(tuple(argsout),-1)
987       return argsout
988     except Exception:
989       exc_typ,exc_val,exc_fr=sys.exc_info()
990       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
991       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
992       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s, outargsname: %s" % (self.nodeName,outargsname),0))
993
994   def executeFirst(self,argsin):
995     """ Same than first part of self.execute to reduce memory peak."""
996     def ArgInMananger(self,argsin):
997       argsInPy = SeqByteReceiver( argsin )
998       data = argsInPy.data()
999       self.addInfoOnLevel2("inputMem",len(data))
1000       _,kws=pickle.loads(data)
1001       return kws
1002     try:
1003       self.beginOfCurrentExecutionSession()
1004       self.addTimeInfoOnLevel2("startInputTime")
1005       # to force call of SeqByteReceiver's destructor
1006       kws = ArgInMananger(self,argsin)
1007       vis = InOutputObjVisitor()
1008       for elt in kws:
1009         # fetch real data if necessary
1010         kws[elt] = UnProxyObjectSimple( kws[elt],vis)
1011       self.addInfoOnLevel2("inputHDDMem",vis)
1012       self.context.update(kws)
1013       self.addTimeInfoOnLevel2("endInputTime")
1014     except Exception:
1015       exc_typ,exc_val,exc_fr=sys.exc_info()
1016       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1017       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1018       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:First %s" % (self.nodeName),0))
1019
1020   def executeSecond(self,outargsname):
1021     """ Same than second part of self.execute to reduce memory peak."""
1022     import sys
1023     try:
1024       self.addTimeInfoOnLevel2("startExecTime")
1025       ##
1026       self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
1027       with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
1028         self._current_execution_session._current_instance = self.executeNow( outargsname )
1029         cpumeminfo = ReadCPUMemInfo( monitoringParams )
1030       ##
1031       self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
1032       del monitoringParams
1033       self.addTimeInfoOnLevel2("endExecTime")
1034       self.addTimeInfoOnLevel2("startOutputTime")
1035       argsout=[]
1036       for arg in outargsname:
1037         if arg not in self.context:
1038           raise KeyError("There is no variable %s in context" % arg)
1039         argsout.append(self.context[arg])
1040       ret = [ ]
1041       outputMem = 0
1042       vis = InOutputObjVisitor()
1043       for arg in argsout:
1044         # the proxy mecanism is catched here
1045         argPickle = SpoolPickleObject( arg, vis )
1046         retArg = SenderByte_i( self.poa,argPickle )
1047         id_o = self.poa.activate_object(retArg)
1048         retObj = self.poa.id_to_reference(id_o)
1049         ret.append( retObj._narrow( SALOME.SenderByte ) )
1050         outputMem += len(argPickle)
1051       self.addInfoOnLevel2("outputMem",outputMem)
1052       self.addInfoOnLevel2("outputHDDMem",vis)
1053       self.addTimeInfoOnLevel2("endOutputTime")
1054       self.endOfCurrentExecutionSession()
1055       return ret
1056     except Exception:
1057       exc_typ,exc_val,exc_fr=sys.exc_info()
1058       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1059       print("".join(l)) ; sys.stdout.flush() # print error also in logs of remote container
1060       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode:Second %s, outargsname: %s" % (self.nodeName,outargsname),0))
1061
1062   def listAllVarsInContext(self):
1063       import re
1064       pat = re.compile("^__([a-z]+)__$")
1065       return [elt for elt in self.context if not pat.match(elt) and elt != MY_CONTAINER_ENTRY_IN_GLBS]
1066       
1067   def removeAllVarsInContext(self):
1068       for elt in self.listAllVarsInContext():
1069         del self.context[elt]
1070
1071   def getValueOfVarInContext(self,varName):
1072     try:
1073       return pickle.dumps(self.context[varName],-1)
1074     except Exception:
1075       exc_typ,exc_val,exc_fr=sys.exc_info()
1076       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1077       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1078     pass
1079   
1080   def assignVarInContext(self, varName, value):
1081     try:
1082       self.context[varName][0] = pickle.loads(value)
1083     except Exception:
1084       exc_typ,exc_val,exc_fr=sys.exc_info()
1085       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1086       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1087     pass
1088
1089   def callMethodOnVarInContext(self, varName, methodName, args):
1090     try:
1091       return pickle.dumps( getattr(self.context[varName][0],methodName)(*pickle.loads(args)),-1 )
1092     except Exception:
1093       exc_typ,exc_val,exc_fr=sys.exc_info()
1094       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
1095       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyScriptNode: %s" %self.nodeName,0))
1096     pass
1097
1098   def beginOfCurrentExecutionSession(self):
1099     self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
1100     self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
1101   
1102   def endOfCurrentExecutionSession(self):
1103     self._current_execution_session.finalizeAndPushToMaster()
1104     self._current_execution_session = None
1105
1106   def addInfoOnLevel2(self, key, value):
1107     self._current_execution_session.addInfoOnLevel2(key, value)
1108       
1109   def addTimeInfoOnLevel2(self, key):
1110     from datetime import datetime
1111     self._current_execution_session.addInfoOnLevel2(key,datetime.now())
1112
1113 class PyScriptNode_i(PyScriptNode_Abstract_i):
1114   def __init__(self, nodeName, code, poa, my_container, logscript):
1115     super().__init__(nodeName, code, poa, my_container, logscript)
1116
1117   def executeNow(self, outargsname):
1118     return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
1119     
1120 class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
1121   def __init__(self, nodeName, code, poa, my_container, logscript):
1122     super().__init__(nodeName, code, poa, my_container, logscript)
1123
1124   def executeNow(self, outargsname):
1125     return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
1126
1127 class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
1128   def __init__(self, nodeName, code, poa, my_container, logscript):
1129     super().__init__(nodeName, code, poa, my_container, logscript)
1130
1131   def executeNow(self, outargsname):
1132     return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)