]> SALOME platform Git repositories - modules/yacs.git/blob - src/yacsloader/driver_internal.py
Salome HOME
Fix driver when there are no tasks to launch.
[modules/yacs.git] / src / yacsloader / driver_internal.py
1 # -*- coding: utf-8 -*-
2 # Copyright (C) 2024  CEA, EDF
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 import salome
22 import logging
23
24 DisplayEntryInCMD = "--display"
25 VerboseEntryInCMD = "--verbose"
26 StopOnErrorEntryInCMD = "--stop-on-error"
27 DumpOnErrorEntryInCMD = "--dump-on-error"
28 DumpEntryInCMD = "--dump"
29 KernelTraceEntryInCMD = "--kerneltrace"
30 DumpStateEntryInCMD = "--dump-final"
31 LoadStateEntryInCMD = "--load-state"
32 SaveXMLSchemaEntryInCMD = "--save-xml-schema"
33 ShutdownEntryInCMD = "--shutdown"
34 ResetEntryInCMD = "--reset"
35 InitPortEntryInCMD = "--init-port"
36 DoNotSqueezeEntryInCMD = "--donotsqueeze"
37 IOREntryInCMD = "--ior-ns"
38 CPUTimeResOfContainerEntryInCMD = "--cpu-mem-container-time-res"
39 HTOPFileEntryInCMD = "--htop-of-yacs-engine-process-file"
40 HTOPServerFileEntryInCMD = "--htop-of-servers"
41 HTOPFileTimeResEntryInCMD = "--htop-of-yacs-engine-process-time-res"
42 HTOPServerFileTimeResEntryInCMD = "--htop-of-servers-time-res"
43 MonitoringDirsEntryInCMD = "--monitoring-dirs-content"
44 MonitoringDirsResEntryInCMD = "--monitoring-dirs-content-res"
45 MonitoringDirsTimeResEntryInCMD = "--monitoring-dirs-content-time-res"
46 ReplayOnErrorEntryInCMD = "--replay-on-error"
47
48 DisplayKeyInARGS = "display"
49 VerboseKeyInARGS = "verbose"
50 StopOnErrorKeyInARGS = "stop"
51 DumpOnErrorKeyInARGS = "dumpErrorFile"
52 DumpKeyInARGS = "dump"
53 KernelTraceKeyInARGS = "kerneltrace"
54 DumpStateKeyInARGS = "finalDump"
55 LoadStateKeyInARGS = "loadState"
56 SaveXMLSchemaKeyInARGS = "saveXMLSchema"
57 ShutdownKeyInARGS = "shutdown"
58 ResetKeyInARGS = "reset"
59 InitPortKeyInARGS = "init_port"
60 DoNotSqueezeKeyInARGS = "donotsqueeze"
61 IORKeyInARGS = "iorNS"
62 CPUTimeResOfContainerKeyInARGS = "cpu_mem_container_time_res"
63 HTOPFileKeyInARGS = "htop_of_yacs_engine_process_file"
64 HTOPServerFileKeyInARGS = "htop_of_servers"
65 HTOPFileTimeResKeyInARGS = "htop_of_yacs_engine_process_time_res"
66 HTOPServerFileTimeResKeyInARGS = "htop_of_servers_time_res"
67 MonitoringDirsInARGS = "monitoring_dirs_content"
68 MonitoringDirsResInARGS = "monitoring_dirs_content_res"
69 MonitoringDirsTimeResInARGS = "monitoring_dirs_content_time_res"
70 ReplayOnErrorEntryInARGS = "replay_on_error"
71
72 KeyValnARGS = [(DisplayEntryInCMD,DisplayKeyInARGS),
73                (VerboseEntryInCMD,VerboseKeyInARGS),
74                (StopOnErrorEntryInCMD,StopOnErrorKeyInARGS),
75                (DumpOnErrorEntryInCMD,DumpOnErrorKeyInARGS),
76                (DumpEntryInCMD,DumpKeyInARGS),
77                (KernelTraceEntryInCMD,KernelTraceKeyInARGS),
78                (DumpStateEntryInCMD,DumpStateKeyInARGS),
79                (LoadStateEntryInCMD,LoadStateKeyInARGS),
80                (SaveXMLSchemaEntryInCMD,SaveXMLSchemaKeyInARGS),
81                (ShutdownEntryInCMD,ShutdownKeyInARGS),
82                (ResetEntryInCMD,ResetKeyInARGS),
83                (InitPortEntryInCMD,InitPortKeyInARGS),
84                (DoNotSqueezeEntryInCMD,DoNotSqueezeKeyInARGS),
85                (CPUTimeResOfContainerEntryInCMD,CPUTimeResOfContainerKeyInARGS),
86                (HTOPFileEntryInCMD,HTOPFileKeyInARGS),
87                (HTOPFileTimeResEntryInCMD,HTOPFileTimeResKeyInARGS),
88                (HTOPServerFileEntryInCMD,HTOPServerFileKeyInARGS),
89                (HTOPServerFileTimeResEntryInCMD,HTOPServerFileTimeResKeyInARGS),
90                (MonitoringDirsEntryInCMD,MonitoringDirsInARGS),
91                (MonitoringDirsResEntryInCMD,MonitoringDirsResInARGS),
92                (MonitoringDirsTimeResEntryInCMD,MonitoringDirsTimeResInARGS),
93                (ReplayOnErrorEntryInCMD,ReplayOnErrorEntryInARGS),
94                (IOREntryInCMD,IORKeyInARGS)]
95
96 my_runtime_yacs = None
97
98 my_ior_ns = None
99
100 my_replay_on_error = False
101
102 def initializeSALOME():
103   import SALOMERuntime
104   import KernelBasis
105   global my_runtime_yacs,my_ior_ns,my_runtime_yacs
106   if my_runtime_yacs:
107     return
108   salome.salome_init()
109   if my_replay_on_error:
110     KernelBasis.SetPyExecutionMode("OutOfProcessWithReplay")
111   if my_ior_ns:
112     salome.naming_service.DumpIORInFile( my_ior_ns )
113   flags = SALOMERuntime.RuntimeSALOME.UsePython + SALOMERuntime.RuntimeSALOME.UseCorba + SALOMERuntime.RuntimeSALOME.UseXml + SALOMERuntime.RuntimeSALOME.UseCpp + SALOMERuntime.RuntimeSALOME.UseSalome
114   SALOMERuntime.RuntimeSALOME.setRuntime( flags )
115   my_runtime_yacs = SALOMERuntime.getSALOMERuntime()
116   anIOR = salome.orb.object_to_string ( salome.modulcat )
117   aCatalog = my_runtime_yacs.loadCatalog( "session", anIOR )
118   my_runtime_yacs.addCatalog( aCatalog )
119
120 def SALOMEInitializationNeeded(func):
121   def decaratedFunc(*args,**kwargs):
122     initializeSALOME()
123     return func(*args,**kwargs)
124   return decaratedFunc
125
126 @SALOMEInitializationNeeded
127 def loadGraph( xmlFileName ):
128   """
129   Args:
130   -----
131   xmlFileName : XML file containing YACS schema
132
133   Returns
134   -------
135
136   SALOMERuntime.SalomeProc : YACS graph instance
137   """
138   import loader
139   l=loader.YACSLoader()
140   p=l.load( xmlFileName )
141   return p
142
143 def patchGraph( proc, squeezeMemory, initPorts, xmlSchema, loadStateXmlFile, reset, display):
144   """
145   Args:
146   -----
147
148   proc ( SALOMERuntime.SalomeProc ) : YACS Proc instance to be evaluated
149   squeezeMemory ( bool ) : squeezememory to be activated
150   initPorts (list<string>) : list of bloc.node.port=value.
151   xmlSchema (string) :
152   loadStateXmlFile (string) : file if any of state to be loaded inside proc
153   reset (int) : 
154   display (int) :
155   """
156   import SALOMERuntime
157   import loader
158   import pilot
159   def parse_init_port(input):
160     """
161     Returns
162     -------
163     node, port, value
164     """
165     node_port, value = input.split("=")
166     nodePortSpl = node_port.split(".")
167     port = nodePortSpl[-1]
168     node = ".".join( nodePortSpl[:-1] )
169     return node,port,value
170       
171   if squeezeMemory:
172     logging.info("SqueezeMemory requested -> update proc")
173     allNodes = proc.getAllRecursiveNodes()
174     for node in allNodes:
175       if isinstance(proc,SALOMERuntime.PythonNode):
176         node.setSqueezeStatus( True )
177   #
178   for initPort in initPorts:
179       node,port,value = parse_init_port(initPort)
180       init_state = proc.setInPortValue(node, port, value)
181       if init_state != value:
182         raise RuntimeError(f"Error on initialization of {initPort}")
183   #
184   if xmlSchema:
185     SALOMERuntime.VisitorSaveSalomeSchemaUnsafe(proc,xmlSchema)
186     pass
187   #
188   info = pilot.LinkInfo( pilot.LinkInfo.ALL_DONT_STOP )
189   proc.checkConsistency(info)
190   if info.areWarningsOrErrors():
191     raise RuntimeError( info.getGlobalRepr() )
192   #
193   if loadStateXmlFile:
194     loader.loadState( proc, loadStateXmlFile )
195     if reset > 0:
196       proc.resetState(reset)
197       proc.exUpdateState()
198   #
199   if display > 0:
200       proc.writeDotInFile("toto")
201          
202 @SALOMEInitializationNeeded
203 def prepareExecution(proc, isStop, dumpErrorFile):
204   """
205   Returns
206   -------
207
208   pilot.ExecutorSwig : Instance of executor
209   """
210   import pilot
211   ex=pilot.ExecutorSwig()
212   if isStop:
213     logging.info(f"Stop has been activated with {dumpErrorFile}")
214     ex.setStopOnError( dumpErrorFile!="", dumpErrorFile )
215   return ex
216
217 @SALOMEInitializationNeeded
218 def executeGraph( executor, xmlfilename, proc, dump, finalDump, display, shutdown, CPUMemContainerTimeRes,
219                  HTopOfThisProcessFile, HTopTimeRes,
220                  HTopOfAllServersFile, HTopOfAllServersTimeRes, DirectoriesToMonitor):
221   """
222   Args:
223   -----
224
225   executor (pilot.ExecutorSwig) : Executor in charge of evaluation.
226   proc ( SALOMERuntime.SalomeProc ) : YACS Proc instance to be evaluated
227   xmlfilename (string)
228   dump (int) : time interval between 2 dump state
229   finalDump ( string ) : filename containing final result of graph, if any.
230   display (int) :
231   shutdown (int) : shutdown level
232   CPUMemContainerTimeRes (int) : time in second between two measures of CPU/Mem in container processes
233   HTopOfThisProcessFile (str) : file name (if not empty) containing the result of measure of current process
234   HTopTimeRes (int) : time in second between two measures of CPU/Mem of current process
235   HTopOfAllServersFile (str) : file name (if not empty) containing the result of measure of all servers
236   HTopOfAllServersTimeRes (int) : time in second between two measures of CPU/Mem of any of server
237   """
238   import SALOMERuntime
239   import pilot
240   import os
241   import contextlib
242
243   class AutoShutdown:
244     def __init__(self, proc, shutdown):
245       self._proc = proc
246       self._shutdown = shutdown
247     def __enter__(self):
248       pass
249     
250     def __exit__(self,exctype, exc, tb):
251       if my_replay_on_error:
252         listOfGrps = []
253         for cont in salome.get_all_containers():
254           listOfGrps += cont.getAllLogFileNameGroups()
255         print("{} {} {}".format( 100*"=", "List of replay sessions of failing usecases" , 100*"="))
256         for igrp,grp in enumerate(listOfGrps):
257           print("{} : {}".format("Group {}".format(igrp)," ".join(grp)))
258         print(300*"=")
259       #
260       if self._shutdown < 999:
261         self._proc.shutdown(self._shutdown)
262       salome.dsm.shutdownScopes()
263       my_runtime_yacs.fini( False )
264
265   class AutoDumpThread:
266     def __init__(self, proc, dump, xmlfilename):
267       self._dumpFile = "dumpState_{}".format( os.path.basename(xmlfilename) )
268       self._lockFile = "{}.lock".format( os.path.splitext( os.path.basename(xmlfilename) )[0] )
269     def __enter__(self):
270       logging.info(f"Ready to launch thread of state dump with  dumpFile = {self._dumpFile}  lockFile = {self._lockFile}")
271       self._dump_thread = SALOMERuntime.ThreadDumpState(proc,dump,self._dumpFile,self._lockFile)
272       self._dump_thread.start()
273     def __exit__(self,exctype, exc, tb):
274       self._dump_thread.join()
275   
276   def MonitoringDirectories( DirectoriesToMonitor ):
277     import SALOME_PyNode
278     if len( DirectoriesToMonitor ) > 0:
279       return [ SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.FileSystemMonitoring(timeRes*1000,zeDir,zeDirRes) ) for zeDir,zeDirRes,timeRes in DirectoriesToMonitor ]
280     else:
281       return [ ]
282
283   def MonitoringThisProcess(HTopOfThisProcessFile,HTopTimeRes):
284     import SALOME_PyNode
285     if HTopOfThisProcessFile:
286       return [ SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.CPUMemoryMonitoring(1000*HTopTimeRes,HTopOfThisProcessFile) ) ]
287     else:
288       return [ ]
289     
290   def MonitoringAllKernelServers(HTopOfAllServersFile, HTopOfAllServersTimeRes):
291     if HTopOfAllServersFile:
292       return [ salome.LogManagerLaunchMonitoringFileCtxMgr( 1000*HTopOfAllServersTimeRes, HTopOfAllServersFile ) ]
293     else:
294       return [ ]
295
296   #
297   salome.cm.SetDeltaTimeBetweenCPUMemMeasureInMilliSecond( 1000*CPUMemContainerTimeRes )
298   # Start part of context manager instances
299   ctxManagers = [ AutoShutdown(proc,shutdown) ] # the first one must be this one. Because orb.shutdown must be called last !
300   #
301   ctxManagers += MonitoringDirectories( DirectoriesToMonitor ) + MonitoringThisProcess(HTopOfThisProcessFile, HTopTimeRes) + MonitoringAllKernelServers(HTopOfAllServersFile, HTopOfAllServersTimeRes)
302   #
303   if dump != 0:
304     ctxManagers += [ AutoDumpThread(proc,dump,xmlfilename) ]
305   # end of part of context managers
306   with contextlib.ExitStack() as stack:
307     for mgr in ctxManagers:
308       stack.enter_context(mgr)
309     executor.RunPy(proc,display,isPyThread=True,fromscratch=True) # same as RunW but releasing GIL
310   #
311   if proc.getEffectiveState() != pilot.DONE:
312     raise RuntimeError( proc.getErrorReport() )
313   #
314   if display > 0:
315       proc.writeDotInFile("titi")
316   #
317   if finalDump:
318     logging.info(f"Final dump requested : {finalDump}")
319     SALOMERuntime.schemaSaveStateUnsafe( proc, finalDump )
320
321 def EntryFromCoarseEntry( entry ):
322   if entry[:2] != "--":
323     raise RuntimeError("Unexpected entry")
324   return entry[2:]
325
326 def toDict( args ):
327   """
328   Convert argparse.Namespace to dict
329   """
330   return {EntryFromCoarseEntry(entry):getattr(args,key) for entry,key in KeyValnARGS}
331
332 def reprAfterArgParsing( args ):
333   """
334   Args:
335   -----
336
337   args (argparse.Namespace) : instance after parsing
338   """
339   return "\n".join( [ f"{EntryFromCoarseEntry(entry)} : {args[key]}" for entry,key in KeyValnARGS ] )
340
341 def getArgumentParser():
342   import argparse
343   parser = argparse.ArgumentParser()
344   parser.add_argument('xmlfilename',help = "XML file containing YACS schema to be executed")
345   parser.add_argument("-d", DisplayEntryInCMD, dest = DisplayKeyInARGS, type=int, const=1, nargs='?', default=0, help="Display dot files: 0=never to 3=very often")
346   parser.add_argument("-v", VerboseEntryInCMD, dest = VerboseKeyInARGS,help="Produce verbose output", action='store_true')
347   parser.add_argument("-s",StopOnErrorEntryInCMD,dest=StopOnErrorKeyInARGS,help="Stop on first error", action='store_true')
348   parser.add_argument("-e",DumpOnErrorEntryInCMD,dest=DumpOnErrorKeyInARGS, type=str, const='dumpErrorState.xml', default="", nargs='?', help="Stop on first error and dump state")
349   parser.add_argument("-g",DumpEntryInCMD,dest=DumpKeyInARGS, type=int, const=60, default=0, nargs='?', help="dump state")
350   parser.add_argument("-kt", KernelTraceEntryInCMD, dest = KernelTraceKeyInARGS,help="Produce verbose of SALOME/KERNEL", action='store_true')
351   parser.add_argument("-f",DumpStateEntryInCMD, dest =DumpStateKeyInARGS, type=str, const='finalDumpState.xml', default="", nargs='?', help="dump final state")
352   parser.add_argument("-l",LoadStateEntryInCMD, dest=LoadStateKeyInARGS, type=str, default="", help="Load State from a previous partial execution")
353   parser.add_argument("-x",SaveXMLSchemaEntryInCMD, dest=SaveXMLSchemaKeyInARGS, type=str, const="saveSchema.xml", nargs='?', default="", help = "dump xml schema")
354   parser.add_argument("-t",ShutdownEntryInCMD, dest = ShutdownKeyInARGS, type=int , default=3, help="Shutdown the schema: 0=no shutdown to 3=full shutdown")
355   parser.add_argument("-r",ResetEntryInCMD, dest = ResetKeyInARGS, type=int , default = 0, help="Reset the schema before execution: 0=nothing, 1=reset error nodes to ready state")
356   parser.add_argument("-i",InitPortEntryInCMD, dest = InitPortKeyInARGS, type=str, default =[], action='append', help="Initialisation value of a port, specified as bloc.node.port=value. For multiple settings use comma.")
357   parser.add_argument("-z",DoNotSqueezeEntryInCMD, dest = DoNotSqueezeKeyInARGS, help = "Desactivate squeeze memory optimization.", action='store_true')
358   parser.add_argument(CPUTimeResOfContainerEntryInCMD, dest = CPUTimeResOfContainerKeyInARGS, type=int, default = 10, help="Time in second between two measures of CPU/Mem in container processes")
359   parser.add_argument(HTOPFileEntryInCMD, dest = HTOPFileKeyInARGS, type=str, default ="", help="File name (if not empty) containing the result of measure of current process")
360   parser.add_argument(HTOPFileTimeResEntryInCMD, dest = HTOPFileTimeResKeyInARGS, type=int, default = 60, help="Time in second between between two measures of CPU/Mem of current process")
361   parser.add_argument(HTOPServerFileEntryInCMD, dest = HTOPServerFileKeyInARGS, type=str, default ="", help="File name (if not empty) containing the result of measure of all server processes")
362   parser.add_argument(HTOPServerFileTimeResEntryInCMD, dest = HTOPServerFileTimeResKeyInARGS, type=int, default = 30, help="Time in second between between two measures of CPU/Mem of any server process")
363   parser.add_argument(MonitoringDirsEntryInCMD, dest = MonitoringDirsInARGS, nargs='+', type=str, default =[], help="List of directories to be monitored")
364   parser.add_argument(MonitoringDirsResEntryInCMD, dest = MonitoringDirsResInARGS, nargs='+', type=str, default =[], help=f"List of files with result of monitoring of directories to be monitored (see {MonitoringDirsInARGS}). The size of lists are expected to be the same.")
365   parser.add_argument(MonitoringDirsTimeResEntryInCMD, dest = MonitoringDirsTimeResInARGS, nargs='+', type=int, default =[], help=f"List of time resolution (in second) of monitoring of directories to be monitored (see {MonitoringDirsInARGS}). The size of lists are expected to be the same.")
366   parser.add_argument("-w",ReplayOnErrorEntryInCMD,dest=ReplayOnErrorEntryInARGS,help="Mode of execution of YACS where all python execution are wrapped into a subprocess to be able to resist against failure (such as SIGSEV)", action='store_true')
367   parser.add_argument(IOREntryInCMD, dest = IORKeyInARGS, type=str, default ="", help="file inside which the ior of NS will be stored")
368   parser.add_argument("--options_from_json", dest = "options_from_json", type=str, default ="", help="Json file of options. If defined options in json will override those specified in command line.")
369   return parser
370
371 def mainRun( args, xmlFileName):
372   """
373   Args:
374   -----
375
376   args (dict) : options for treatment
377
378   """
379   global my_ior_ns,my_replay_on_error
380   from salome_utils import positionVerbosityOfLoggerRegardingState,setVerboseLevel,setVerbose
381   #
382   iorNS = args[IORKeyInARGS]
383   #
384   if iorNS:
385     my_ior_ns = iorNS
386   #
387   if args[ReplayOnErrorEntryInARGS]:
388     my_replay_on_error = True
389   #
390   if args[VerboseKeyInARGS]:
391     setVerbose( args[ KernelTraceKeyInARGS ] )
392     setVerboseLevel(logging.INFO)
393     positionVerbosityOfLoggerRegardingState()
394     logging.info( reprAfterArgParsing(args) )
395   #
396   proc = loadGraph( xmlFileName )
397   # work around a bug in Executor::Run when there are no tasks to launch.
398   if len(proc.getChildren()) == 0 :
399     return
400   patchGraph( proc, not args[DoNotSqueezeKeyInARGS], args[InitPortKeyInARGS], args[SaveXMLSchemaKeyInARGS], args[LoadStateKeyInARGS], args[ResetKeyInARGS], args[DisplayKeyInARGS])
401   executor = prepareExecution( proc, args[StopOnErrorKeyInARGS], args[DumpOnErrorKeyInARGS])
402   #
403   executeGraph( executor, xmlFileName, proc, args[DumpKeyInARGS], args[DumpStateKeyInARGS], args[DisplayKeyInARGS], args[ShutdownKeyInARGS], args[CPUTimeResOfContainerKeyInARGS],
404                args[HTOPFileKeyInARGS], args[HTOPFileTimeResKeyInARGS],
405                args[HTOPServerFileKeyInARGS], args[HTOPServerFileTimeResKeyInARGS], [(dirToMonitor,resFile,timeRes) for dirToMonitor,resFile,timeRes in zip(args[MonitoringDirsInARGS],args[MonitoringDirsResInARGS],args[MonitoringDirsTimeResInARGS])] )
406
407 def parseArgs():
408   """
409   Returns
410   -------
411
412   - args (dict) : dictionnary containing all args taken into account. If json, the params in json will override entries
413   - xmlFileName (str) : XML YACS schema
414   
415   """
416   import json
417   parser = getArgumentParser()
418   args = parser.parse_args()
419   iorNS = args.iorNS
420   xmlFileName = args.xmlfilename
421   optionFromJSon = args.options_from_json
422   args = toDict( args )
423   if optionFromJSon:
424     # in case of Json overrides 
425     with open( optionFromJSon ) as f:
426       opts_from_json = json.load( f )
427     for k,v in opts_from_json.items():
428       if k != EntryFromCoarseEntry(IOREntryInCMD) or v:# for IOR if v is null -> do not override
429         args[k] = v
430   # change key of args from entryCMD to KeyInARGS
431   args = {key:args[EntryFromCoarseEntry(entry)] for entry,key in KeyValnARGS}
432   return args, xmlFileName
433
434 if __name__ == "__main__":
435   args, xmlFileName = parseArgs()
436   mainRun( args, xmlFileName)