Salome HOME
[EDF29576] : destroy containers even if problem arises
[modules/yacs.git] / src / yacsloader / driver_internal.py
1 # -*- coding: utf-8 -*-
2 # Copyright (C) 2024  CEA, EDF
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 import salome
22 import logging
23
24 DisplayEntryInCMD = "--display"
25 VerboseEntryInCMD = "--verbose"
26 StopOnErrorEntryInCMD = "--stop-on-error"
27 DumpOnErrorEntryInCMD = "--dump-on-error"
28 DumpEntryInCMD = "--dump"
29 KernelTraceEntryInCMD = "--kerneltrace"
30 DumpStateEntryInCMD = "--dump-final"
31 LoadStateEntryInCMD = "--load-state"
32 SaveXMLSchemaEntryInCMD = "--save-xml-schema"
33 ShutdownEntryInCMD = "--shutdown"
34 ResetEntryInCMD = "--reset"
35 InitPortEntryInCMD = "--init-port"
36 DoNotSqueezeEntryInCMD = "--donotsqueeze"
37 IOREntryInCMD = "--ior-ns"
38 CPUTimeResOfContainerEntryInCMD = "--cpu-mem-container-time-res"
39 HTOPFileEntryInCMD = "--htop-of-yacs-engine-process-file"
40 HTOPServerFileEntryInCMD = "--htop-of-servers"
41 HTOPFileTimeResEntryInCMD = "--htop-of-yacs-engine-process-time-res"
42 HTOPServerFileTimeResEntryInCMD = "--htop-of-servers-time-res"
43 MonitoringDirsEntryInCMD = "--monitoring-dirs-content"
44 MonitoringDirsResEntryInCMD = "--monitoring-dirs-content-res"
45 MonitoringDirsTimeResEntryInCMD = "--monitoring-dirs-content-time-res"
46
47 DisplayKeyInARGS = "display"
48 VerboseKeyInARGS = "verbose"
49 StopOnErrorKeyInARGS = "stop"
50 DumpOnErrorKeyInARGS = "dumpErrorFile"
51 DumpKeyInARGS = "dump"
52 KernelTraceKeyInARGS = "kerneltrace"
53 DumpStateKeyInARGS = "finalDump"
54 LoadStateKeyInARGS = "loadState"
55 SaveXMLSchemaKeyInARGS = "saveXMLSchema"
56 ShutdownKeyInARGS = "shutdown"
57 ResetKeyInARGS = "reset"
58 InitPortKeyInARGS = "init_port"
59 DoNotSqueezeKeyInARGS = "donotsqueeze"
60 IORKeyInARGS = "iorNS"
61 CPUTimeResOfContainerKeyInARGS = "cpu_mem_container_time_res"
62 HTOPFileKeyInARGS = "htop_of_yacs_engine_process_file"
63 HTOPServerFileKeyInARGS = "htop_of_servers"
64 HTOPFileTimeResKeyInARGS = "htop_of_yacs_engine_process_time_res"
65 HTOPServerFileTimeResKeyInARGS = "htop_of_servers_time_res"
66 MonitoringDirsInARGS = "monitoring_dirs_content"
67 MonitoringDirsResInARGS = "monitoring_dirs_content_res"
68 MonitoringDirsTimeResInARGS = "monitoring_dirs_content_time_res"
69
70 KeyValnARGS = [(DisplayEntryInCMD,DisplayKeyInARGS),
71                (VerboseEntryInCMD,VerboseKeyInARGS),
72                (StopOnErrorEntryInCMD,StopOnErrorKeyInARGS),
73                (DumpOnErrorEntryInCMD,DumpOnErrorKeyInARGS),
74                (DumpEntryInCMD,DumpKeyInARGS),
75                (KernelTraceEntryInCMD,KernelTraceKeyInARGS),
76                (DumpStateEntryInCMD,DumpStateKeyInARGS),
77                (LoadStateEntryInCMD,LoadStateKeyInARGS),
78                (SaveXMLSchemaEntryInCMD,SaveXMLSchemaKeyInARGS),
79                (ShutdownEntryInCMD,ShutdownKeyInARGS),
80                (ResetEntryInCMD,ResetKeyInARGS),
81                (InitPortEntryInCMD,InitPortKeyInARGS),
82                (DoNotSqueezeEntryInCMD,DoNotSqueezeKeyInARGS),
83                (CPUTimeResOfContainerEntryInCMD,CPUTimeResOfContainerKeyInARGS),
84                (HTOPFileEntryInCMD,HTOPFileKeyInARGS),
85                (HTOPFileTimeResEntryInCMD,HTOPFileTimeResKeyInARGS),
86                (HTOPServerFileEntryInCMD,HTOPServerFileKeyInARGS),
87                (HTOPServerFileTimeResEntryInCMD,HTOPServerFileTimeResKeyInARGS),
88                (MonitoringDirsEntryInCMD,MonitoringDirsInARGS),
89                (MonitoringDirsResEntryInCMD,MonitoringDirsResInARGS),
90                (MonitoringDirsTimeResEntryInCMD,MonitoringDirsTimeResInARGS),
91                (IOREntryInCMD,IORKeyInARGS)]
92
93 my_runtime_yacs = None
94
95 my_ior_ns = None
96
97 def initializeSALOME():
98   import SALOMERuntime
99   global my_runtime_yacs
100   if my_runtime_yacs:
101     return
102   salome.salome_init()
103   if my_ior_ns:
104     salome.naming_service.DumpIORInFile( my_ior_ns )
105   flags = SALOMERuntime.RuntimeSALOME.UsePython + SALOMERuntime.RuntimeSALOME.UseCorba + SALOMERuntime.RuntimeSALOME.UseXml + SALOMERuntime.RuntimeSALOME.UseCpp + SALOMERuntime.RuntimeSALOME.UseSalome
106   SALOMERuntime.RuntimeSALOME.setRuntime( flags )
107   my_runtime_yacs = SALOMERuntime.getSALOMERuntime()
108   anIOR = salome.orb.object_to_string ( salome.modulcat )
109   aCatalog = my_runtime_yacs.loadCatalog( "session", anIOR )
110   my_runtime_yacs.addCatalog( aCatalog )
111
112 def SALOMEInitializationNeeded(func):
113   def decaratedFunc(*args,**kwargs):
114     initializeSALOME()
115     return func(*args,**kwargs)
116   return decaratedFunc
117
118 @SALOMEInitializationNeeded
119 def loadGraph( xmlFileName ):
120   """
121   Args:
122   -----
123   xmlFileName : XML file containing YACS schema
124
125   Returns
126   -------
127
128   SALOMERuntime.SalomeProc : YACS graph instance
129   """
130   import loader
131   l=loader.YACSLoader()
132   p=l.load( xmlFileName )
133   return p
134
135 def patchGraph( proc, squeezeMemory, initPorts, xmlSchema, loadStateXmlFile, reset, display):
136   """
137   Args:
138   -----
139
140   proc ( SALOMERuntime.SalomeProc ) : YACS Proc instance to be evaluated
141   squeezeMemory ( bool ) : squeezememory to be activated
142   initPorts (list<string>) : list of bloc.node.port=value.
143   xmlSchema (string) :
144   loadStateXmlFile (string) : file if any of state to be loaded inside proc
145   reset (int) : 
146   display (int) :
147   """
148   import SALOMERuntime
149   import loader
150   import pilot
151   def parse_init_port(input):
152     """
153     Returns
154     -------
155     node, port, value
156     """
157     node_port, value = input.split("=")
158     nodePortSpl = node_port.split(".")
159     port = nodePortSpl[-1]
160     node = ".".join( nodePortSpl[:-1] )
161     return node,port,value
162       
163   if squeezeMemory:
164     logging.info("SqueezeMemory requested -> update proc")
165     allNodes = proc.getAllRecursiveNodes()
166     for node in allNodes:
167       if isinstance(proc,SALOMERuntime.PythonNode):
168         node.setSqueezeStatus( True )
169   #
170   for initPort in initPorts:
171       node,port,value = parse_init_port(initPort)
172       init_state = proc.setInPortValue(node, port, value)
173       if init_state != value:
174         raise RuntimeError(f"Error on initialization of {initPort}")
175   #
176   if xmlSchema:
177     SALOMERuntime.VisitorSaveSalomeSchemaUnsafe(proc,xmlSchema)
178     pass
179   #
180   info = pilot.LinkInfo( pilot.LinkInfo.ALL_DONT_STOP )
181   proc.checkConsistency(info)
182   if info.areWarningsOrErrors():
183     raise RuntimeError( info.getGlobalRepr() )
184   #
185   if loadStateXmlFile:
186     loader.loadState( proc, loadStateXmlFile )
187     if reset > 0:
188       proc.resetState(reset)
189       proc.exUpdateState()
190   #
191   if display > 0:
192       proc.writeDotInFile("toto")
193          
194 @SALOMEInitializationNeeded
195 def prepareExecution(proc, isStop, dumpErrorFile):
196   """
197   Returns
198   -------
199
200   pilot.ExecutorSwig : Instance of executor
201   """
202   import pilot
203   ex=pilot.ExecutorSwig()
204   if isStop:
205     logging.info(f"Stop has been activated with {dumpErrorFile}")
206     ex.setStopOnError( dumpErrorFile!="", dumpErrorFile )
207   return ex
208
209 @SALOMEInitializationNeeded
210 def executeGraph( executor, xmlfilename, proc, dump, finalDump, display, shutdown, CPUMemContainerTimeRes,
211                  HTopOfThisProcessFile, HTopTimeRes,
212                  HTopOfAllServersFile, HTopOfAllServersTimeRes, DirectoriesToMonitor):
213   """
214   Args:
215   -----
216
217   executor (pilot.ExecutorSwig) : Executor in charge of evaluation.
218   proc ( SALOMERuntime.SalomeProc ) : YACS Proc instance to be evaluated
219   xmlfilename (string)
220   dump (int) : time interval between 2 dump state
221   finalDump ( string ) : filename containing final result of graph, if any.
222   display (int) :
223   shutdown (int) : shutdown level
224   CPUMemContainerTimeRes (int) : time in second between two measures of CPU/Mem in container processes
225   HTopOfThisProcessFile (str) : file name (if not empty) containing the result of measure of current process
226   HTopTimeRes (int) : time in second between two measures of CPU/Mem of current process
227   HTopOfAllServersFile (str) : file name (if not empty) containing the result of measure of all servers
228   HTopOfAllServersTimeRes (int) : time in second between two measures of CPU/Mem of any of server
229   """
230   import SALOMERuntime
231   dump_thread = None
232   import pilot
233   import os
234   import contextlib
235
236   class AutoShutdown:
237     def __init__(self, proc, shutdown):
238       self._proc = proc
239       self._shutdown = shutdown
240     def __enter__(self):
241       pass
242     
243     def __exit__(self,exctype, exc, tb):
244       if self._shutdown < 999:
245         self._proc.shutdown(self._shutdown)
246       salome.dsm.shutdownScopes()
247       my_runtime_yacs.fini( False )
248   
249   def MonitoringDirectories( DirectoriesToMonitor ):
250     import SALOME_PyNode
251     if len( DirectoriesToMonitor ) > 0:
252       return [ SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.FileSystemMonitoring(timeRes*1000,zeDir,zeDirRes) ) for zeDir,zeDirRes,timeRes in DirectoriesToMonitor ]
253     else:
254       return [ contextlib.nullcontext() ]
255
256   def MonitoringThisProcess(HTopOfThisProcessFile,HTopTimeRes):
257     import SALOME_PyNode
258     if HTopOfThisProcessFile:
259       return SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.CPUMemoryMonitoring(1000*HTopTimeRes,HTopOfThisProcessFile) )
260     else:
261       return contextlib.nullcontext()
262     
263   def MonitoringAllKernelServers(HTopOfAllServersFile, HTopOfAllServersTimeRes):
264     if HTopOfAllServersFile:
265       return salome.LogManagerLaunchMonitoringFileCtxMgr( 1000*HTopOfAllServersTimeRes, HTopOfAllServersFile )
266     else:
267       return contextlib.nullcontext()
268   #
269   if dump != 0:
270     dumpFile = "dumpState_{}".format( os.path.basename(xmlfilename) )
271     lockFile = "{}.lock".format( os.path.splitext( os.path.basename(xmlfilename) )[0] )
272     dump_thread = SALOMERuntime.ThreadDumpState(proc,dump,dumpFile,lockFile)
273     dump_thread.start()
274   #
275   salome.cm.SetDeltaTimeBetweenCPUMemMeasureInMilliSecond( 1000*CPUMemContainerTimeRes )
276   #
277   ctxManagers = MonitoringDirectories( DirectoriesToMonitor ) + [ MonitoringThisProcess(HTopOfThisProcessFile, HTopTimeRes) ] + [ MonitoringAllKernelServers(HTopOfAllServersFile, HTopOfAllServersTimeRes) ]
278   
279   with AutoShutdown(proc,shutdown):
280     with contextlib.ExitStack() as stack:
281       for mgr in ctxManagers:
282         stack.enter_context(mgr)
283       executor.RunPy(proc,display,isPyThread=True,fromscratch=True) # same as RunW but releasing GIL
284     #
285     if dump_thread:
286         dump_thread.join()
287     #
288     if proc.getEffectiveState() != pilot.DONE:
289       raise RuntimeError( proc.getErrorReport() )
290   #
291   if display > 0:
292       proc.writeDotInFile("titi")
293   #
294   if finalDump:
295     logging.info(f"Final dump requested : {finalDump}")
296     SALOMERuntime.schemaSaveStateUnsafe( proc, finalDump )
297
298 def EntryFromCoarseEntry( entry ):
299   if entry[:2] != "--":
300     raise RuntimeError("Unexpected entry")
301   return entry[2:]
302
303 def toDict( args ):
304   """
305   Convert argparse.Namespace to dict
306   """
307   return {EntryFromCoarseEntry(entry):getattr(args,key) for entry,key in KeyValnARGS}
308
309 def reprAfterArgParsing( args ):
310   """
311   Args:
312   -----
313
314   args (argparse.Namespace) : instance after parsing
315   """
316   return "\n".join( [ f"{EntryFromCoarseEntry(entry)} : {args[key]}" for entry,key in KeyValnARGS ] )
317
318 def getArgumentParser():
319   import argparse
320   parser = argparse.ArgumentParser()
321   parser.add_argument('xmlfilename',help = "XML file containing YACS schema to be executed")
322   parser.add_argument("-d", DisplayEntryInCMD, dest = DisplayKeyInARGS, type=int, const=1, nargs='?', default=0, help="Display dot files: 0=never to 3=very often")
323   parser.add_argument("-v", VerboseEntryInCMD, dest = VerboseKeyInARGS,help="Produce verbose output", action='store_true')
324   parser.add_argument("-s",StopOnErrorEntryInCMD,dest=StopOnErrorKeyInARGS,help="Stop on first error", action='store_true')
325   parser.add_argument("-e",DumpOnErrorEntryInCMD,dest=DumpOnErrorKeyInARGS, type=str, const='dumpErrorState.xml', default="", nargs='?', help="Stop on first error and dump state")
326   parser.add_argument("-g",DumpEntryInCMD,dest=DumpKeyInARGS, type=int, const=60, default=0, nargs='?', help="dump state")
327   parser.add_argument("-kt", KernelTraceEntryInCMD, dest = KernelTraceKeyInARGS,help="Produce verbose of SALOME/KERNEL", action='store_true')
328   parser.add_argument("-f",DumpStateEntryInCMD, dest =DumpStateKeyInARGS, type=str, const='finalDumpState.xml', default="", nargs='?', help="dump final state")
329   parser.add_argument("-l",LoadStateEntryInCMD, dest=LoadStateKeyInARGS, type=str, default="", help="Load State from a previous partial execution")
330   parser.add_argument("-x",SaveXMLSchemaEntryInCMD, dest=SaveXMLSchemaKeyInARGS, type=str, const="saveSchema.xml", nargs='?', default="", help = "dump xml schema")
331   parser.add_argument("-t",ShutdownEntryInCMD, dest = ShutdownKeyInARGS, type=int , default=3, help="Shutdown the schema: 0=no shutdown to 3=full shutdown")
332   parser.add_argument("-r",ResetEntryInCMD, dest = ResetKeyInARGS, type=int , default = 0, help="Reset the schema before execution: 0=nothing, 1=reset error nodes to ready state")
333   parser.add_argument("-i",InitPortEntryInCMD, dest = InitPortKeyInARGS, type=str, default =[], action='append', help="Initialisation value of a port, specified as bloc.node.port=value. For multiple settings use comma.")
334   parser.add_argument("-z",DoNotSqueezeEntryInCMD, dest = DoNotSqueezeKeyInARGS, help = "Desactivate squeeze memory optimization.", action='store_true')
335   parser.add_argument(CPUTimeResOfContainerEntryInCMD, dest = CPUTimeResOfContainerKeyInARGS, type=int, default = 10, help="Time in second between two measures of CPU/Mem in container processes")
336   parser.add_argument(HTOPFileEntryInCMD, dest = HTOPFileKeyInARGS, type=str, default ="", help="File name (if not empty) containing the result of measure of current process")
337   parser.add_argument(HTOPFileTimeResEntryInCMD, dest = HTOPFileTimeResKeyInARGS, type=int, default = 60, help="Time in second between between two measures of CPU/Mem of current process")
338   parser.add_argument(HTOPServerFileEntryInCMD, dest = HTOPServerFileKeyInARGS, type=str, default ="", help="File name (if not empty) containing the result of measure of all server processes")
339   parser.add_argument(HTOPServerFileTimeResEntryInCMD, dest = HTOPServerFileTimeResKeyInARGS, type=int, default = 30, help="Time in second between between two measures of CPU/Mem of any server process")
340   parser.add_argument(MonitoringDirsEntryInCMD, dest = MonitoringDirsInARGS, nargs='+', type=str, default =[], help="List of directories to be monitored")
341   parser.add_argument(MonitoringDirsResEntryInCMD, dest = MonitoringDirsResInARGS, nargs='+', type=str, default =[], help=f"List of files with result of monitoring of directories to be monitored (see {MonitoringDirsInARGS}). The size of lists are expected to be the same.")
342   parser.add_argument(MonitoringDirsTimeResEntryInCMD, dest = MonitoringDirsTimeResInARGS, nargs='+', type=int, default =[], help=f"List of time resolution (in second) of monitoring of directories to be monitored (see {MonitoringDirsInARGS}). The size of lists are expected to be the same.")
343   parser.add_argument(IOREntryInCMD, dest = IORKeyInARGS, type=str, default ="", help="file inside which the ior of NS will be stored")
344   parser.add_argument("--options_from_json", dest = "options_from_json", type=str, default ="", help="Json file of options. If defined options in json will override those specified in command line.")
345   return parser
346
347 def mainRun( args, xmlFileName):
348   """
349   Args:
350   -----
351
352   args (dict) : options for treatment
353
354   """
355   global my_ior_ns
356   from salome_utils import positionVerbosityOfLoggerRegardingState,setVerboseLevel,setVerbose
357   #
358   iorNS = args[IORKeyInARGS]
359   #
360   if iorNS:
361     my_ior_ns = iorNS
362   #
363   if args[VerboseKeyInARGS]:
364     setVerbose( args[ KernelTraceKeyInARGS ] )
365     setVerboseLevel(logging.INFO)
366     positionVerbosityOfLoggerRegardingState()
367     logging.info( reprAfterArgParsing(args) )
368   #
369   proc = loadGraph( xmlFileName )
370   patchGraph( proc, not args[DoNotSqueezeKeyInARGS], args[InitPortKeyInARGS], args[SaveXMLSchemaKeyInARGS], args[LoadStateKeyInARGS], args[ResetKeyInARGS], args[DisplayKeyInARGS])
371   executor = prepareExecution( proc, args[StopOnErrorKeyInARGS], args[DumpOnErrorKeyInARGS])
372   #
373   executeGraph( executor, xmlFileName, proc, args[DumpKeyInARGS], args[DumpStateKeyInARGS], args[DisplayKeyInARGS], args[ShutdownKeyInARGS], args[CPUTimeResOfContainerKeyInARGS],
374                args[HTOPFileKeyInARGS], args[HTOPFileTimeResKeyInARGS],
375                args[HTOPServerFileKeyInARGS], args[HTOPServerFileTimeResKeyInARGS], [(dirToMonitor,resFile,timeRes) for dirToMonitor,resFile,timeRes in zip(args[MonitoringDirsInARGS],args[MonitoringDirsResInARGS],args[MonitoringDirsTimeResInARGS])] )
376
377 def parseArgs():
378   """
379   Returns
380   -------
381
382   - args (dict) : dictionnary containing all args taken into account. If json, the params in json will override entries
383   - xmlFileName (str) : XML YACS schema
384   
385   """
386   import json
387   parser = getArgumentParser()
388   args = parser.parse_args()
389   iorNS = args.iorNS
390   xmlFileName = args.xmlfilename
391   optionFromJSon = args.options_from_json
392   args = toDict( args )
393   if optionFromJSon:
394     # in case of Json overrides 
395     with open( optionFromJSon ) as f:
396       opts_from_json = json.load( f )
397     for k,v in opts_from_json.items():
398       if k != EntryFromCoarseEntry(IOREntryInCMD) or v:# for IOR if v is null -> do not override
399         args[k] = v
400   # change key of args from entryCMD to KeyInARGS
401   args = {key:args[EntryFromCoarseEntry(entry)] for entry,key in KeyValnARGS}
402   return args, xmlFileName
403
404 if __name__ == "__main__":
405   args, xmlFileName = parseArgs()
406   mainRun( args, xmlFileName)