-// Copyright (C) 2019 CEA/DEN, EDF R&D
+// Copyright (C) 2019-2023 CEA, EDF
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
%module pylauncher
%{
-#include "SALOME_ResourcesCatalog_Parser.hxx"
-#include "Launcher_Job.hxx"
-#include "Launcher_Job_SALOME.hxx"
-#include "Launcher_Job_YACSFile.hxx"
#include "Launcher.hxx"
#include "ResourcesManager.hxx"
-static PyObject *convertJob(Launcher::Job *job, int owner)
-{
- PyObject *ret(nullptr);
- if(!job)
- {
- Py_XINCREF(Py_None);
- return Py_None;
- }
- if(dynamic_cast<Launcher::Job_YACSFile *>(job))
- return SWIG_NewPointerObj((void*)dynamic_cast<Launcher::Job_YACSFile *>(job),SWIGTYPE_p_Launcher__Job_YACSFile,owner);
- throw LauncherException("Not recognized type of job on downcast !");
-}
-
-%}
-
-%include std_string.i
+#include <sstream>
-%typemap(out) const std::list<std::string>&
+struct ResourceDefinition_cpp
{
- std::size_t i;
- std::list<std::string>::const_iterator iL;
- $result = PyList_New($1->size());
- for (i=0, iL=$1->cbegin(); iL!=$1->cend(); i++, iL++)
- PyList_SetItem($result,i,PyUnicode_FromString((*iL).c_str()));
-}
+public:
+ std::string name;
+ std::string hostname;
+ std::string type;
+ std::string protocol;
+ std::string username;
+ std::string applipath;
+ std::string OS;
+ int mem_mb;
+ int cpu_clock;
+ int nb_node;
+ int nb_proc_per_node;
+ std::string batch;
+ std::string mpiImpl;
+ std::string iprotocol;
+ bool can_launch_batch_jobs;
+ bool can_run_containers;
+ std::string working_directory;
+};
-%typemap(out) const std::map<std::string, std::string> &
+std::shared_ptr<ResourcesManager_cpp> HandleToLocalInstance(const std::string& ptrInStringFrmt)
{
- $result = PyDict_New();
- for(std::map<std::string, std::string>::const_iterator iL=$1->cbegin();iL!=$1->cend();iL++)
- {
- PyObject *a(PyUnicode_FromString((*iL).first.c_str()));
- PyObject *b(PyUnicode_FromString((*iL).second.c_str()));
- PyDict_SetItem($result,a,b);
- Py_DECREF(a); Py_DECREF(b);
- }
+ std::istringstream iss(ptrInStringFrmt);
+ void *zePtr(nullptr);
+ iss >> zePtr;
+ std::shared_ptr<ResourcesManager_cpp> *effPtr = reinterpret_cast<std::shared_ptr<ResourcesManager_cpp> *>(zePtr);
+ std::shared_ptr<ResourcesManager_cpp> ret(*effPtr);
+ delete effPtr;
+ return ret;
}
+%}
-%typemap(out) std::vector<std::string> *
-{
- std::size_t i;
- std::vector<std::string>::const_iterator iL;
- $result = PyList_New($1->size());
- for (i=0, iL=$1->cbegin(); iL!=$1->cend(); i++, iL++)
- PyList_SetItem($result,i,PyUnicode_FromString((*iL).c_str()));
-}
+%include "std_string.i"
+%include "std_vector.i"
+%include "std_list.i"
+%include "std_map.i"
-%typemap(out) Launcher::Job *
-{
- $result=convertJob($1,$owner);
-}
-
-%exception
-{
- try {
- $function
- }
- catch(LauncherException& e)
- {
- SWIG_exception_fail(SWIG_RuntimeError,e.msg.c_str());
- }
- catch (...)
- {
- SWIG_exception_fail(SWIG_UnknownError, "Unknown");
- }
-}
+namespace std {
+ %template(list_int) list<int>;
+ %template(list_str) list<string>;
+ %template(vector_str) vector<string>;
+ %template(map_ss) map<string,string>;
+};
+// see ResourceParameters from SALOME_ResourcesManager.idl
+// see resourceParams from ResourcesManager.hxx
+%naturalvar JobParameters_cpp::componentList;
+%naturalvar JobParameters_cpp::resourceList;
struct resourceParams
{
resourceParams();
std::vector<std::string> resourceList;
};
-class ResourcesManager_cpp
+// see JobParameters from SALOME_Launcher.idl
+// see JobParameters_cpp from Launcher.hxx
+%naturalvar JobParameters_cpp::in_files;
+%naturalvar JobParameters_cpp::out_files;
+%naturalvar JobParameters_cpp::specific_parameters;
+struct JobParameters_cpp
{
- public:
- ResourcesManager_cpp(const char *xmlFilePath);
+public:
+ std::string job_name;
+ std::string job_type;
+ std::string job_file;
+ std::string pre_command;
+ std::string env_file;
+ std::list<std::string> in_files;
+ std::list<std::string> out_files;
+ std::string work_directory;
+ std::string local_directory;
+ std::string result_directory;
+ std::string maximum_duration;
+ resourceParams resource_required;
+ std::string queue;
+ std::string partition;
+ bool exclusive;
+ unsigned int mem_per_cpu;
+ std::string wckey;
+ std::string extra_params;
+ std::map<std::string, std::string> specific_parameters;
+ std::string launcher_file;
+ std::string launcher_args;
+};
+
+// see ResourceDefinition from SALOME_ResourcesManager.idl
+// no other c++ equivalent. Convertion from ParserResourcesType
+struct ResourceDefinition_cpp
+{
+public:
+ std::string name;
+ std::string hostname;
+ std::string type;
+ std::string protocol;
+ std::string username;
+ std::string applipath;
+ std::string OS;
+ int mem_mb;
+ int cpu_clock;
+ int nb_node;
+ int nb_proc_per_node;
+ std::string batch;
+ std::string mpiImpl;
+ std::string iprotocol;
+ bool can_launch_batch_jobs;
+ bool can_run_containers;
+ std::string working_directory;
};
-class ParserResourcesType
+%exception
+{
+ try
+ {
+ $function
+ }
+ catch (ResourcesException& e)
+ {
+ SWIG_exception_fail(SWIG_RuntimeError, e.msg.c_str());
+ }
+ catch(...)
+ {
+ SWIG_exception_fail(SWIG_RuntimeError,"Unknown exception");
+ }
+}
+
+%include <std_shared_ptr.i>
+%shared_ptr(ResourcesManager_cpp)
+
+class ResourcesManager_cpp
{
public:
- ParserResourcesType();
- ~ParserResourcesType();
- std::string getAccessProtocolTypeStr() const;
- std::string getResourceTypeStr() const;
- std::string getBatchTypeStr() const;
- std::string getMpiImplTypeStr() const;
- std::string getClusterInternalProtocolStr() const;
- std::string getCanLaunchBatchJobsStr() const;
- std::string getCanRunContainersStr() const;
-
- void setAccessProtocolTypeStr(const std::string & protocolTypeStr);
- void setResourceTypeStr(const std::string & resourceTypeStr);
- void setBatchTypeStr(const std::string & batchTypeStr);
- void setMpiImplTypeStr(const std::string & mpiImplTypeStr);
- void setClusterInternalProtocolStr(const std::string & internalProtocolTypeStr);
- void setCanLaunchBatchJobsStr(const std::string & canLaunchBatchJobsStr);
- void setCanRunContainersStr(const std::string & canRunContainersStr);
+ ResourcesManager_cpp(const char *xmlFilePath);
+ std::vector<std::string> GetFittingResources(const resourceParams& params);
+ void WriteInXmlFile(std::string xml_file);
+ void DeleteAllResourcesInCatalog();
+%extend
+{
+ ResourceDefinition_cpp GetResourceDefinition(const std::string& name)
+ {
+ ResourceDefinition_cpp swig_result;
+ ParserResourcesType cpp_result = $self->GetResourcesDescr(name);
+
+ swig_result.name = cpp_result.Name;
+ swig_result.hostname = cpp_result.HostName;
+ swig_result.type = cpp_result.getResourceTypeStr();
+ swig_result.protocol = cpp_result.getAccessProtocolTypeStr();
+ swig_result.username = cpp_result.UserName;
+ swig_result.applipath = cpp_result.AppliPath;
+ swig_result.OS = cpp_result.OS;
+ swig_result.mem_mb = cpp_result.DataForSort._memInMB;
+ swig_result.cpu_clock = cpp_result.DataForSort._CPUFreqMHz;
+ swig_result.nb_node = cpp_result.DataForSort._nbOfNodes;
+ swig_result.nb_proc_per_node = cpp_result.DataForSort._nbOfProcPerNode;
+ swig_result.batch = cpp_result.getBatchTypeStr();
+ swig_result.mpiImpl = cpp_result.getMpiImplTypeStr();
+ swig_result.iprotocol = cpp_result.getClusterInternalProtocolStr();
+ swig_result.can_launch_batch_jobs = cpp_result.can_launch_batch_jobs;
+ swig_result.can_run_containers = cpp_result.can_run_containers;
+ swig_result.working_directory = cpp_result.working_directory;
+
+ return swig_result;
+ }
+
+ void DeleteResourceInCatalog(const std::string& name)
+ {
+ $self->DeleteResourceInCatalog(name.c_str());
+ }
+
+ void AddResourceInCatalog (const ResourceDefinition_cpp& new_resource)
+ {
+ ParserResourcesType new_resource_cpp;
+ new_resource_cpp.Name = new_resource.name;
+ new_resource_cpp.HostName = new_resource.hostname;
+ new_resource_cpp.setResourceTypeStr( new_resource.type );
+ new_resource_cpp.setAccessProtocolTypeStr( new_resource.protocol );
+ new_resource_cpp.UserName = new_resource.username;
+ new_resource_cpp.AppliPath = new_resource.applipath;
+ new_resource_cpp.OS = new_resource.OS;
+ new_resource_cpp.DataForSort._memInMB = new_resource.mem_mb;
+ new_resource_cpp.DataForSort._CPUFreqMHz = new_resource.cpu_clock;
+ new_resource_cpp.DataForSort._nbOfNodes = new_resource.nb_node;
+ new_resource_cpp.DataForSort._nbOfProcPerNode = new_resource.nb_proc_per_node;
+ new_resource_cpp.setBatchTypeStr(new_resource.batch);
+ new_resource_cpp.setMpiImplTypeStr(new_resource.mpiImpl);
+ new_resource_cpp.setClusterInternalProtocolStr(new_resource.iprotocol);
+ new_resource_cpp.can_launch_batch_jobs = new_resource.can_launch_batch_jobs;
+ new_resource_cpp.can_run_containers = new_resource.can_run_containers;
+ new_resource_cpp.working_directory = new_resource.working_directory;
+ $self->AddResourceInCatalog(new_resource_cpp);
+ }
+
+ void ParseXmlFiles()
+ {
+ $self->ParseXmlFiles();
+ }
+
+ std::vector<std::string> GetListOfEntries() const
+ {
+ const MapOfParserResourcesType& allRes = $self->GetList();
+ std::vector<std::string> ret;
+ for(auto it : allRes)
+ ret.push_back(it.first);
+ return ret;
+ }
+}
};
-%feature("unref") Launcher::Job "$this->decrRef();"
-%feature("unref") Launcher::Job_SALOME "$this->decrRef();"
-%feature("unref") Launcher::Job_YACSFile "$this->decrRef();"
+%inline
+{
+ std::shared_ptr<ResourcesManager_cpp> HandleToLocalInstance(const std::string& ptrInStringFrmt);
+}
-namespace Launcher
+%exception
{
- class Job
+ try
{
- public:
- Job();
- virtual ~Job();
- void setState(const std::string & state);
- std::string getState() const;
- std::string getAssignedHostnames();
- void setNumber(const int & number);
- int getNumber();
- virtual void setResourceDefinition(const ParserResourcesType & resource_definition);
- ParserResourcesType getResourceDefinition() const;
- // Common parameters
- void setJobName(const std::string & job_name);
- virtual void setJobFile(const std::string & job_file);
- void setPreCommand(const std::string & preCommand);
- void setWorkDirectory(const std::string & work_directory);
- void setLocalDirectory(const std::string & local_directory);
- void setResultDirectory(const std::string & result_directory);
- void add_in_file(const std::string & file);
- void add_out_file(const std::string & file);
- void setMaximumDuration(const std::string & maximum_duration);
- void setResourceRequiredParams(const resourceParams & resource_required_params);
- void setQueue(const std::string & queue);
- void setPartition(const std::string & partition);
- void setEnvFile(const std::string & env_file);
- void setExclusive(bool exclusive);
- void setExclusiveStr(const std::string & exclusiveStr);
- void setMemPerCpu(unsigned long mem_per_cpu);
- void setWCKey(const std::string & wckey);
- void setExtraParams(const std::string & extra_params);
- void setReference(const std::string & reference);
- // For COORM
- void setLauncherFile(const std::string & launcher_file);
- void setLauncherArgs(const std::string & launcher_args);
-
- std::string getJobName() const;
- std::string getJobFile() const;
- std::string getPreCommand() const;
- std::string getWorkDirectory() const;
- std::string getLocalDirectory() const;
- std::string getResultDirectory() const;
- const std::list<std::string> & get_in_files() const;
- const std::list<std::string> & get_out_files() const;
- std::string getMaximumDuration() const;
- resourceParams getResourceRequiredParams() const;
- std::string getQueue() const;
- std::string getPartition() const;
- std::string getEnvFile() const;
- std::string getJobType() const;
- bool getExclusive() const;
- std::string getExclusiveStr() const;
- unsigned long getMemPerCpu() const;
- std::string getWCKey() const;
- std::string getExtraParams() const;
- std::string getReference() const;
- std::string getLauncherFile() const;
- std::string getLauncherArgs() const;
- std::string updateJobState();
-
- void addSpecificParameter(const std::string & name, const std::string & value);
- const std::map<std::string, std::string> & getSpecificParameters() const;
- virtual void checkSpecificParameters();
-
- // Checks
- void checkMaximumDuration(const std::string & maximum_duration);
- void checkResourceRequiredParams(const resourceParams & resource_required_params);
-
- // Helps
- long convertMaximumDuration(const std::string & maximum_duration);
- std::string getLaunchDate() const;
-
- void stopJob();
- void removeJob();
- virtual void update_job() = 0;
- };
-
- class Job_SALOME : public Job
+ $function
+ }
+ catch (LauncherException& e)
{
- private:
- Job_SALOME();
- public:
- virtual ~Job_SALOME();
- virtual void setResourceDefinition(const ParserResourcesType & resource_definition);
- virtual void update_job();
-#ifdef WITH_LIBBATCH
- public:
- std::string buildSalomeScript(Batch::Parametre params);
-#endif
- };
-
- class Job_YACSFile : public Job_SALOME
+ SWIG_exception_fail(SWIG_RuntimeError, e.msg.c_str());
+ }
+ catch(...)
{
- public:
- Job_YACSFile();
- virtual ~Job_YACSFile();
- virtual void setJobFile(const std::string & job_file);
- virtual void checkSpecificParameters();
- };
+ SWIG_exception_fail(SWIG_RuntimeError,"Unknown exception");
+ }
}
class Launcher_cpp
public:
Launcher_cpp();
virtual ~Launcher_cpp();
- void createJob(Launcher::Job * new_job);
+ int createJob(const JobParameters_cpp& job_parameters);
void launchJob(int job_id);
- const char * getJobState(int job_id);
- const char * getAssignedHostnames(int job_id); // Get names or ids of hosts assigned to the job
+ std::string getJobState(int job_id);
+ std::string getAssignedHostnames(int job_id); // Get names or ids of hosts assigned to the job
+ void exportInputFiles(int job_id);
void getJobResults(int job_id, std::string directory);
void clearJobWorkingDir(int job_id);
bool getJobDumpState(int job_id, std::string directory);
void removeJob(int job_id);
std::string dumpJob(int job_id);
int restoreJob(const std::string& dumpedJob);
+ JobParameters_cpp getJobParameters(int job_id);
std::list<int> loadJobs(const char* jobs_file);
void saveJobs(const char* jobs_file);
long createJobWithFile(std::string xmlExecuteFile, std::string clusterName);
- std::map<int, Launcher::Job *> getJobs();
- void addJobDirectlyToMap(Launcher::Job * new_job);
- void SetResourcesManager( ResourcesManager_cpp *rm );
- Launcher::Job * findJob(int job_id);
+ void SetResourcesManager(std::shared_ptr<ResourcesManager_cpp>& rm );
};
%pythoncode %{
-def sendJobToSession(self, job_id, sessionId=None):
- """Send job specified by its job_id in self to a remote SALOME session.
- Doing so, it's possible to follow the job created locally into the JobManager module of the target SALOME session.
- SALOME session is specified by the file pointed by the content of OMNIORB_CONFIG environement var. The content of this var is called sessionId.
- If sessionId is let untouched, the current OMNIORB_CONFIG environement var is used.
- If this method fails to connect to the target SALOME session a RuntimeError exception will be thrown.
- """
- def job_type_traducer(jyf):
- dico = {'Job_YACSFile' : 'yacs_file'}
- st = type(jyf).__name__
- if st not in dico:
- raise RuntimeError("Not recognized type %s !"%st)
- return dico[st]
- #
- def resource_required_func(jyf):
- import Engines
- rp =jyf.getResourceRequiredParams()
- l12 = [('name', None), ('hostname', None), ('can_launch_batch_jobs', None), ('can_run_containers', None), ('OS', None), ('componentList', None), ('nb_proc', None), ('mem_mb', None), ('cpu_clock', None), ('nb_node', None), ('nb_proc_per_node', None), ('policy', lambda x: 'cycl'), ('resList', lambda x: x.resourceList)]
- kw2={}
- for a,b in l12:
- if a and not b:
- kw2[a]=getattr(rp,a)
- else:
- if a and b:
- kw2[a]=b(rp)
- return Engines.ResourceParameters(**kw2)
- #
- filest = self.dumpJob(job_id);
- # Connect to SALOME session a retrieve its SalomeLauncher object
- import os
- if sessionId is not None:
- os.environ["OMNIORB_CONFIG"]=sessionId
- import Engines
- import orbmodule
- try:
- clt=orbmodule.client()
- sl = clt.Resolve("SalomeLauncher")
- except:
- raise RuntimeError("Fail to connect to the remote SALOME session.")
- # swig to CORBA translation
- # Job_YACSFile -> Engines.JobParameters and resourceParams -> Engines.ResourceParameters()
- l21= [('job_name', None), ('job_type', job_type_traducer), ('job_file', None), ('pre_command', None), ('env_file', None), ('in_files', lambda x: x.get_in_files()), ('out_files', lambda x: x.get_out_files()), ('work_directory', None), ('local_directory', None), ('result_directory', None), ('maximum_duration', None), ('resource_required',resource_required_func) , ('queue', None), ('partition', None), ('exclusive', None), ('mem_per_cpu', None), ('wckey', lambda x: x.getWCKey() ), ('extra_params', None), ('specific_parameters', lambda x: list(x.getSpecificParameters().items())), ('launcher_file', None), ('launcher_args', None)]
- kw={}
- jyf = self.findJob(job_id)
- for a,b in l21:
- if not b and a:
- kw[a]=eval("jyf.get%s()"%"".join(["%s%s"%(elt2[0].upper(),elt2[1:]) for elt2 in a.split("_")]),{"jyf" : jyf})
- else:
- if a and b:
- kw[a]=b(jyf)
- jyc = Engines.JobParameters(**kw)
- ########################
- bpc = sl.createJob(jyc)
- sl.restoreJob(filest)
-
-Launcher_cpp.sendJobToSession = sendJobToSession
-del sendJobToSession
+def CreateSSHContainerResource(hostname,applipath,nbOfNodes=1):
+ return CreateContainerResource(hostname,applipath,"ssh",nbOfNodes)
+
+def CreateSRUNContainerResource(hostname,applipath,nbOfNodes=1):
+ return CreateContainerResource(hostname,applipath,"srun",nbOfNodes)
+
+def CreateContainerResource(hostname,applipath,protocol,nbOfNodes=1):
+ import getpass
+ ret = ResourceDefinition_cpp()
+ ret.name = hostname.split(".")[0]
+ ret.hostname = ret.name
+ ret.protocol = protocol
+ ret.applipath = applipath
+ ret.nb_node = nbOfNodes
+ ret.nb_proc_per_node = 1
+ ret.can_run_containers = True
+ ret.can_launch_batch_jobs = False
+ ret.mpiImpl = "no mpi"
+ ret.iprotocol = protocol
+ ret.type = "single_machine"
+ ret.username = getpass.getuser()
+ return ret
+
+def ResourceDefinition_cpp_repr(self):
+ pat0 = "{} = {}"
+ pat1 = "{} = \"{}\""
+ data = [("name","name",pat0),
+ ("hostname","hostname",pat0),
+ ("type","type",pat0),
+ ("protocol","protocol",pat0),
+ ("userName","username",pat0),
+ ("appliPath","applipath",pat1),
+ ("mpi","mpiImpl",pat0),
+ ("nbOfNodes","nb_node",pat0),
+ ("nbOfProcPerNode","nb_proc_per_node",pat0),
+ ("canRunContainer","can_run_containers",pat0)
+ ]
+ ret = [c.format(a,getattr(self,b)) for a,b,c in data]
+ return "\n".join( ret )
+
+def ResourcesManager_cpp_GetList(self):
+ return {name:self.GetResourceDefinition(name) for name in self.GetListOfEntries()}
+
+def ResourcesManager_cpp___getitem__(self,name):
+ return self.GetResourceDefinition(name)
+
+def ResourcesManager_cpp___repr__(self):
+ return str( self.GetList() )
+
+def RetrieveRMCppSingleton():
+ import KernelLauncher
+ return HandleToLocalInstance( KernelLauncher.RetrieveInternalInstanceOfLocalCppResourcesManager() )
+
+def GetPlayGroundInsideASlurmJob():
+ import subprocess as sp
+ cont = sp.check_output(["srun","hostname"])
+ nodesMul = [elt for elt in cont.decode().split("\n") if elt != ""]
+ from collections import defaultdict
+ d = defaultdict(int)
+ for elt in nodesMul:
+ d[elt]+=1
+ return d
+
+def BuildCatalogFromScratch(protocol):
+ import os
+ d = GetPlayGroundInsideASlurmJob()
+ rmcpp = RetrieveRMCppSingleton()
+ rmcpp.DeleteAllResourcesInCatalog()
+ for k,v in d.items():
+ contRes = CreateContainerResource(hostname=k,applipath=os.environ["APPLI"],protocol=protocol,nbOfNodes=v)
+ rmcpp.AddResourceInCatalog(contRes)
+
+def GetRequestForGiveContainer(hostname, contName):
+ import Engines
+ import os
+ rp=Engines.ResourceParameters(name=hostname,
+ hostname=hostname,
+ can_launch_batch_jobs=False,
+ can_run_containers=True,
+ OS="Linux",
+ componentList=[],
+ nb_proc=1,
+ mem_mb=1000,
+ cpu_clock=1000,
+ nb_node=1,
+ nb_proc_per_node=1,
+ policy="first",
+ resList=[])
+
+ cp=Engines.ContainerParameters(container_name=contName,
+ mode="start",
+ workingdir=os.path.expanduser("~"),
+ nb_proc=1,
+ isMPI=False,
+ parallelLib="",
+ resource_params=rp)
+ return cp
+
+ResourceDefinition_cpp.repr = ResourceDefinition_cpp_repr
+ResourceDefinition_cpp.__repr__ = ResourceDefinition_cpp_repr
+ResourcesManager_cpp.GetList = ResourcesManager_cpp_GetList
+ResourcesManager_cpp.__getitem__ = ResourcesManager_cpp___getitem__
+ResourcesManager_cpp.__repr__ = ResourcesManager_cpp___repr__
%}